From 235122e4c6707019d1f90252095e2a5ffa0009cb Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Mon, 2 Jun 2025 19:40:55 +0900 Subject: [PATCH 1/3] Refactor queryapi Signed-off-by: SungJin1212 --- pkg/api/handlers.go | 4 +- .../query_api.go => query/handler.go} | 93 +++++++--- .../handler_test.go} | 26 +-- pkg/api/query/response.go | 11 ++ pkg/api/query/util.go | 81 +++++++++ pkg/api/queryapi/util.go | 120 ------------- pkg/api/queryapi/util_test.go | 15 -- .../tripperware/instantquery/instant_query.go | 7 +- .../tripperware/query_attribute_matcher.go | 10 +- .../tripperware/queryrange/query_range.go | 22 ++- .../queryrange/query_range_test.go | 8 +- pkg/util/api/errors.go | 13 ++ pkg/util/api/parse.go | 161 ++++++++++++++++++ pkg/util/api/parse_test.go | 118 +++++++++++++ pkg/util/api/response.go | 10 ++ pkg/util/time.go | 77 --------- pkg/util/time_test.go | 107 ------------ 17 files changed, 498 insertions(+), 385 deletions(-) rename pkg/api/{queryapi/query_api.go => query/handler.go} (70%) rename pkg/api/{queryapi/query_api_test.go => query/handler_test.go} (91%) create mode 100644 pkg/api/query/response.go create mode 100644 pkg/api/query/util.go delete mode 100644 pkg/api/queryapi/util.go delete mode 100644 pkg/api/queryapi/util_test.go create mode 100644 pkg/util/api/errors.go create mode 100644 pkg/util/api/parse.go create mode 100644 pkg/util/api/parse_test.go diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index 5173affb197..01a4e2cbb03 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -25,7 +25,7 @@ import ( "github.com/weaveworks/common/instrument" "github.com/weaveworks/common/middleware" - "github.com/cortexproject/cortex/pkg/api/queryapi" + "github.com/cortexproject/cortex/pkg/api/query" "github.com/cortexproject/cortex/pkg/querier" "github.com/cortexproject/cortex/pkg/querier/codec" "github.com/cortexproject/cortex/pkg/querier/stats" @@ -280,7 +280,7 @@ func NewQuerierHandler( legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1")) api.Register(legacyPromRouter) - queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) + queryAPI := query.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin) // TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in: // https://github.com/prometheus/prometheus/pull/7125/files diff --git a/pkg/api/queryapi/query_api.go b/pkg/api/query/handler.go similarity index 70% rename from pkg/api/queryapi/query_api.go rename to pkg/api/query/handler.go index e3488f10387..622141ce37b 100644 --- a/pkg/api/queryapi/query_api.go +++ b/pkg/api/query/handler.go @@ -1,7 +1,8 @@ -package queryapi +package query import ( "context" + "errors" "fmt" "net/http" "time" @@ -9,17 +10,16 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/regexp" + jsoniter "github.com/json-iterator/go" "github.com/munnerz/goautoneg" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/httputil" v1 "github.com/prometheus/prometheus/web/api/v1" - "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/engine" "github.com/cortexproject/cortex/pkg/querier" - "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/api" ) @@ -53,56 +53,57 @@ func NewQueryAPI( } func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { - // TODO(Sungjin1212): Change to emit basic error (not gRPC) - start, err := util.ParseTime(r.FormValue("start")) + start, err := api.ParseTime(r.FormValue("start")) if err != nil { return invalidParamError(err, "start") } - end, err := util.ParseTime(r.FormValue("end")) + end, err := api.ParseTime(r.FormValue("end")) if err != nil { return invalidParamError(err, "end") } - if end < start { - return invalidParamError(ErrEndBeforeStart, "end") + + if end.Before(start) { + return invalidParamError(errors.New("end timestamp must not be before start time"), "end") } - step, err := util.ParseDurationMs(r.FormValue("step")) + step, err := api.ParseDuration(r.FormValue("step")) if err != nil { return invalidParamError(err, "step") } if step <= 0 { - return invalidParamError(ErrNegativeStep, "step") + return invalidParamError(errors.New("zero or negative query resolution step widths are not accepted. Try a positive integer"), "step") } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. - if (end-start)/step > 11000 { - return apiFuncResult{nil, &apiError{errorBadData, ErrStepTooSmall}, nil, nil} + if end.Sub(start)/step > 11000 { + err := errors.New("exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") + return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx := r.Context() if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc - timeout, err := util.ParseDurationMs(to) + timeout, err := api.ParseDuration(to) if err != nil { return invalidParamError(err, "timeout") } - ctx, cancel = context.WithTimeout(ctx, convertMsToDuration(timeout)) + ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } - opts, err := extractQueryOpts(r) + opts, err := api.ExtractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx = engine.AddEngineTypeToContext(ctx, r) ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) - qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step)) + qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), start, end, step) if err != nil { - return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + return invalidParamError(err, "query") } // From now on, we must only return with a finalizer in the result (to // be called by the caller) or call qry.Close ourselves (which is @@ -131,8 +132,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { } func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { - // TODO(Sungjin1212): Change to emit basic error (not gRPC) - ts, err := util.ParseTimeParam(r, "time", q.now().Unix()) + ts, err := api.ParseTimeParam(r, "time", q.now()) if err != nil { return invalidParamError(err, "time") } @@ -140,25 +140,25 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { ctx := r.Context() if to := r.FormValue("timeout"); to != "" { var cancel context.CancelFunc - timeout, err := util.ParseDurationMs(to) + timeout, err := api.ParseDuration(to) if err != nil { return invalidParamError(err, "timeout") } - ctx, cancel = context.WithDeadline(ctx, q.now().Add(convertMsToDuration(timeout))) + ctx, cancel = context.WithDeadline(ctx, q.now().Add(timeout)) defer cancel() } - opts, err := extractQueryOpts(r) + opts, err := api.ExtractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } ctx = engine.AddEngineTypeToContext(ctx, r) ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader)) - qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts)) + qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), ts) if err != nil { - return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query") + return invalidParamError(err, "query") } // From now on, we must only return with a finalizer in the result (to @@ -197,7 +197,7 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc { } if result.err != nil { - api.RespondFromGRPCError(q.logger, w, result.err.err) + q.respondError(w, result.err, result.data) return } @@ -209,6 +209,47 @@ func (q *QueryAPI) Wrap(f apiFunc) http.HandlerFunc { } } +func (q *QueryAPI) respondError(w http.ResponseWriter, apiErr *apiError, data interface{}) { + json := jsoniter.ConfigCompatibleWithStandardLibrary + b, err := json.Marshal(&Response{ + Status: statusError, + ErrorType: apiErr.typ, + Error: apiErr.err.Error(), + Data: data, + }) + if err != nil { + level.Error(q.logger).Log("error marshaling json response", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + var code int + switch apiErr.typ { + case errorBadData: + code = http.StatusBadRequest + case errorExec: + code = http.StatusUnprocessableEntity + case errorCanceled: + code = statusClientClosedConnection + case errorTimeout: + code = http.StatusServiceUnavailable + case errorInternal: + code = http.StatusInternalServerError + case errorNotFound: + code = http.StatusNotFound + case errorNotAcceptable: + code = http.StatusNotAcceptable + default: + code = http.StatusInternalServerError + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + if n, err := w.Write(b); err != nil { + level.Error(q.logger).Log("error writing response", "bytesWritten", n, "err", err) + } +} + func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interface{}, warnings annotations.Annotations, query string) { warn, info := warnings.AsStrings(query, 10, 10) @@ -221,7 +262,7 @@ func (q *QueryAPI) respond(w http.ResponseWriter, req *http.Request, data interf codec, err := q.negotiateCodec(req, resp) if err != nil { - api.RespondFromGRPCError(q.logger, w, httpgrpc.Errorf(http.StatusNotAcceptable, "%s", &apiError{errorNotAcceptable, err})) + q.respondError(w, &apiError{errorNotAcceptable, err}, nil) return } diff --git a/pkg/api/queryapi/query_api_test.go b/pkg/api/query/handler_test.go similarity index 91% rename from pkg/api/queryapi/query_api_test.go rename to pkg/api/query/handler_test.go index 028184a12b8..ca66d2cad8d 100644 --- a/pkg/api/queryapi/query_api_test.go +++ b/pkg/api/query/handler_test.go @@ -1,4 +1,4 @@ -package queryapi +package query import ( "context" @@ -63,7 +63,7 @@ func (mockQuerier) Close() error { return nil } -func Test_CustomAPI(t *testing.T) { +func Test_QueryAPI(t *testing.T) { engine := promql.NewEngine(promql.EngineOpts{ MaxSamples: 100, Timeout: time.Second * 2, @@ -94,25 +94,25 @@ func Test_CustomAPI(t *testing.T) { name: "[Range Query] empty start", path: "/api/v1/query_range?end=1536673680&query=test&step=5", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\"; cannot parse \\\"\\\" to a valid timestamp\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"start\\\": cannot parse \\\"\\\" to a valid timestamp\"}", }, { name: "[Range Query] empty end", path: "/api/v1/query_range?query=test&start=1536673665&step=5", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; cannot parse \\\"\\\" to a valid timestamp\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": cannot parse \\\"\\\" to a valid timestamp\"}", }, { name: "[Range Query] start is greater than end", path: "/api/v1/query_range?end=1536673680&query=test&start=1536673681&step=5", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\"; end timestamp must not be before start time\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"end\\\": end timestamp must not be before start time\"}", }, { name: "[Range Query] negative step", path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=-1", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\"; zero or negative query resolution step widths are not accepted. Try a positive integer\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"step\\\": zero or negative query resolution step widths are not accepted. Try a positive integer\"}", }, { name: "[Range Query] returned points are over 11000", @@ -124,19 +124,19 @@ func Test_CustomAPI(t *testing.T) { name: "[Range Query] empty query", path: "/api/v1/query_range?end=1536673680&start=1536673665&step=5", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}", }, { name: "[Range Query] invalid lookback delta", path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&lookback_delta=dummy", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}", }, { name: "[Range Query] invalid timeout delta", path: "/api/v1/query_range?end=1536673680&query=test&start=1536673665&step=5&timeout=dummy", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}", }, { name: "[Range Query] normal case", @@ -148,19 +148,19 @@ func Test_CustomAPI(t *testing.T) { name: "[Instant Query] empty query", path: "/api/v1/query", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\"; unknown position: parse error: no expression found in input\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"query\\\": unknown position: parse error: no expression found in input\"}", }, { name: "[Instant Query] invalid lookback delta", path: "/api/v1/query?lookback_delta=dummy", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: rpc error: code = Code(400) desc = cannot parse \\\"dummy\\\" to a valid duration\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"error parsing lookback delta duration: cannot parse \\\"dummy\\\" to a valid duration\"}", }, { name: "[Instant Query] invalid timeout", path: "/api/v1/query?timeout=dummy", expectedCode: http.StatusBadRequest, - expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\"; cannot parse \\\"dummy\\\" to a valid duration\"}", + expectedBody: "{\"status\":\"error\",\"errorType\":\"bad_data\",\"error\":\"invalid parameter \\\"timeout\\\": cannot parse \\\"dummy\\\" to a valid duration\"}", }, { name: "[Instant Query] normal case", @@ -243,7 +243,7 @@ func Test_InvalidCodec(t *testing.T) { require.Equal(t, http.StatusNotAcceptable, rec.Code) } -func Test_CustomAPI_StatsRenderer(t *testing.T) { +func Test_QueryAPI_StatsRenderer(t *testing.T) { engine := promql.NewEngine(promql.EngineOpts{ MaxSamples: 100, Timeout: time.Second * 2, diff --git a/pkg/api/query/response.go b/pkg/api/query/response.go new file mode 100644 index 00000000000..29ec12ef113 --- /dev/null +++ b/pkg/api/query/response.go @@ -0,0 +1,11 @@ +package query + +// Response defines the Prometheus response format. +type Response struct { + Status string `json:"status"` + Data interface{} `json:"data,omitempty"` + ErrorType errorType `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` + Warnings []string `json:"warnings,omitempty"` + Infos []string `json:"infos,omitempty"` +} diff --git a/pkg/api/query/util.go b/pkg/api/query/util.go new file mode 100644 index 00000000000..0e63d3042a0 --- /dev/null +++ b/pkg/api/query/util.go @@ -0,0 +1,81 @@ +package query + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/util/annotations" +) + +const ( + statusSuccess = "success" + statusError = "error" + + // Non-standard status code (originally introduced by nginx) for the case when a client closes + // the connection while the server is still processing the request. + statusClientClosedConnection = 499 +) + +type errorType string + +const ( + errorTimeout errorType = "timeout" + errorCanceled errorType = "canceled" + errorExec errorType = "execution" + errorBadData errorType = "bad_data" + errorInternal errorType = "internal" + errorNotFound errorType = "not_found" + errorNotAcceptable errorType = "not_acceptable" +) + +type apiError struct { + typ errorType + err error +} + +func (e *apiError) Error() string { + return fmt.Sprintf("%s: %s", e.typ, e.err) +} + +func returnAPIError(err error) *apiError { + if err == nil { + return nil + } + + var eqc promql.ErrQueryCanceled + var eqt promql.ErrQueryTimeout + var es promql.ErrStorage + + switch { + case errors.As(err, &eqc): + return &apiError{errorCanceled, err} + case errors.As(err, &eqt): + return &apiError{errorTimeout, err} + case errors.As(err, &es): + return &apiError{errorInternal, err} + } + + if errors.Is(err, context.Canceled) { + return &apiError{errorCanceled, err} + } + + return &apiError{errorExec, err} +} + +type apiFuncResult struct { + data interface{} + err *apiError + warnings annotations.Annotations + finalizer func() +} + +type apiFunc func(r *http.Request) apiFuncResult + +func invalidParamError(err error, parameter string) apiFuncResult { + return apiFuncResult{nil, &apiError{ + errorBadData, fmt.Errorf("invalid parameter %q: %w", parameter, err), + }, nil, nil} +} diff --git a/pkg/api/queryapi/util.go b/pkg/api/queryapi/util.go deleted file mode 100644 index 9d85b8a96c7..00000000000 --- a/pkg/api/queryapi/util.go +++ /dev/null @@ -1,120 +0,0 @@ -package queryapi - -import ( - "context" - "errors" - "fmt" - "net/http" - "time" - - "github.com/gogo/status" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/util/annotations" - "github.com/weaveworks/common/httpgrpc" - - "github.com/cortexproject/cortex/pkg/util" -) - -var ( - ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") - ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") - ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") -) - -func extractQueryOpts(r *http.Request) (promql.QueryOpts, error) { - var duration time.Duration - - if strDuration := r.FormValue("lookback_delta"); strDuration != "" { - parsedDuration, err := util.ParseDurationMs(strDuration) - if err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, "error parsing lookback delta duration: %v", err) - } - duration = convertMsToDuration(parsedDuration) - } - - return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil -} - -const ( - statusSuccess = "success" - - // Non-standard status code (originally introduced by nginx) for the case when a client closes - // the connection while the server is still processing the request. - statusClientClosedConnection = 499 -) - -type errorType string - -const ( - errorTimeout errorType = "timeout" - errorCanceled errorType = "canceled" - errorExec errorType = "execution" - errorBadData errorType = "bad_data" - errorInternal errorType = "internal" - errorNotAcceptable errorType = "not_acceptable" -) - -type apiError struct { - typ errorType - err error -} - -func (e *apiError) Error() string { - return fmt.Sprintf("%s: %s", e.typ, e.err) -} - -func returnAPIError(err error) *apiError { - if err == nil { - return nil - } - - var eqc promql.ErrQueryCanceled - var eqt promql.ErrQueryTimeout - var es promql.ErrStorage - - switch { - case errors.As(err, &eqc): - return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)} - case errors.As(err, &eqt): - return &apiError{errorTimeout, httpgrpc.Errorf(http.StatusServiceUnavailable, "%v", err)} - case errors.As(err, &es): - return &apiError{errorInternal, httpgrpc.Errorf(http.StatusInternalServerError, "%v", err)} - } - - if errors.Is(err, context.Canceled) { - return &apiError{errorCanceled, httpgrpc.Errorf(statusClientClosedConnection, "%v", err)} - } - - return &apiError{errorExec, httpgrpc.Errorf(http.StatusUnprocessableEntity, "%v", err)} -} - -type apiFuncResult struct { - data interface{} - err *apiError - warnings annotations.Annotations - finalizer func() -} - -type apiFunc func(r *http.Request) apiFuncResult - -func invalidParamError(err error, parameter string) apiFuncResult { - return apiFuncResult{nil, &apiError{ - errorBadData, DecorateWithParamName(err, parameter), - }, nil, nil} -} - -func convertMsToTime(unixMs int64) time.Time { - return time.Unix(0, unixMs*int64(time.Millisecond)) -} - -func convertMsToDuration(unixMs int64) time.Duration { - return time.Duration(unixMs) * time.Millisecond -} - -func DecorateWithParamName(err error, field string) error { - errTmpl := "invalid parameter %q; %v" - if status, ok := status.FromError(err); ok { - return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message()) - } - return fmt.Errorf(errTmpl, field, err) -} diff --git a/pkg/api/queryapi/util_test.go b/pkg/api/queryapi/util_test.go deleted file mode 100644 index f3caf3ec8e7..00000000000 --- a/pkg/api/queryapi/util_test.go +++ /dev/null @@ -1,15 +0,0 @@ -package queryapi - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func Test_Convert(t *testing.T) { - time := time.Now().UnixMilli() - - require.Equal(t, time, convertMsToTime(time).UnixMilli()) - require.Equal(t, time, convertMsToDuration(time).Milliseconds()) -} diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 54fe4aeba0d..025ef7fafd0 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -17,10 +17,9 @@ import ( v1 "github.com/prometheus/prometheus/web/api/v1" "github.com/weaveworks/common/httpgrpc" - "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -64,9 +63,9 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := tripperware.PrometheusRequest{Headers: map[string][]string{}} var err error - result.Time, err = util.ParseTimeParam(r, "time", c.now().Unix()) + result.Time, err = api.ParseTimeParamMillis(r, "time", c.now()) if err != nil { - return nil, queryapi.DecorateWithParamName(err, "time") + return nil, api.DecorateWithParamName(err, "time") } result.Query = r.FormValue("query") diff --git a/pkg/querier/tripperware/query_attribute_matcher.go b/pkg/querier/tripperware/query_attribute_matcher.go index 7edd9f0b098..89d327f062b 100644 --- a/pkg/querier/tripperware/query_attribute_matcher.go +++ b/pkg/querier/tripperware/query_attribute_matcher.go @@ -10,7 +10,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/querier/stats" - "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -28,7 +28,7 @@ func rejectQueryOrSetPriority(r *http.Request, now time.Time, lookbackDelta time if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } - minTime, maxTime := util.FindMinMaxTime(r, expr, lookbackDelta, now) + minTime, maxTime := api.FindMinMaxTime(r, expr, lookbackDelta, now) if queryReject := limits.QueryRejection(userStr); queryReject.Enabled && query != "" { for _, attribute := range queryReject.QueryAttributes { @@ -172,8 +172,8 @@ func matchAttributeForMetadataQuery(attribute validation.QueryAttribute, op stri } } - startTime, _ := util.ParseTime(r.FormValue("start")) - endTime, _ := util.ParseTime(r.FormValue("end")) + startTime, _ := api.ParseTimeMillis(r.FormValue("start")) + endTime, _ := api.ParseTimeMillis(r.FormValue("end")) if attribute.TimeWindow.Start != 0 || attribute.TimeWindow.End != 0 { matched = true @@ -237,7 +237,7 @@ func isWithinTimeRangeAttribute(limit validation.TimeRangeLimit, startTime, endT func isWithinQueryStepLimit(queryStepLimit validation.QueryStepLimit, r *http.Request) bool { - step, err := util.ParseDurationMs(r.FormValue("step")) + step, err := api.ParseDurationMillis(r.FormValue("step")) if err != nil { return false } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 9d82031fc0b..8633ec9429a 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -16,11 +16,9 @@ import ( "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" - "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" - "github.com/cortexproject/cortex/pkg/util" - + "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -98,33 +96,33 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, req tripperware.Requ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := tripperware.PrometheusRequest{Headers: map[string][]string{}} var err error - result.Start, err = util.ParseTime(r.FormValue("start")) + result.Start, err = api.ParseTimeMillis(r.FormValue("start")) if err != nil { - return nil, queryapi.DecorateWithParamName(err, "start") + return nil, api.DecorateWithParamName(err, "start") } - result.End, err = util.ParseTime(r.FormValue("end")) + result.End, err = api.ParseTimeMillis(r.FormValue("end")) if err != nil { - return nil, queryapi.DecorateWithParamName(err, "end") + return nil, api.DecorateWithParamName(err, "end") } if result.End < result.Start { - return nil, queryapi.ErrEndBeforeStart + return nil, api.ErrEndBeforeStart } - result.Step, err = util.ParseDurationMs(r.FormValue("step")) + result.Step, err = api.ParseDurationMillis(r.FormValue("step")) if err != nil { - return nil, queryapi.DecorateWithParamName(err, "step") + return nil, api.DecorateWithParamName(err, "step") } if result.Step <= 0 { - return nil, queryapi.ErrNegativeStep + return nil, api.ErrNegativeStep } // For safety, limit the number of returned points per timeseries. // This is sufficient for 60s resolution for a week or 1h resolution for a year. if (result.End-result.Start)/result.Step > 11000 { - return nil, queryapi.ErrStepTooSmall + return nil, api.ErrStepTooSmall } result.Query = r.FormValue("query") diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 1f3ebb137d8..f8764bb619a 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -20,9 +20,9 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/util/api" ) func sortPrometheusResponseHeader(headers []*tripperware.PrometheusResponseHeader) { @@ -56,7 +56,7 @@ func TestRequest(t *testing.T) { }, { url: "api/v1/query_range?start=123&end=0", - expectedErr: queryapi.ErrEndBeforeStart, + expectedErr: api.ErrEndBeforeStart, }, { url: "api/v1/query_range?start=123&end=456&step=baz", @@ -64,11 +64,11 @@ func TestRequest(t *testing.T) { }, { url: "api/v1/query_range?start=123&end=456&step=-1", - expectedErr: queryapi.ErrNegativeStep, + expectedErr: api.ErrNegativeStep, }, { url: "api/v1/query_range?start=0&end=11001&step=1", - expectedErr: queryapi.ErrStepTooSmall, + expectedErr: api.ErrStepTooSmall, }, } { tc := tc diff --git a/pkg/util/api/errors.go b/pkg/util/api/errors.go new file mode 100644 index 00000000000..863970a13d1 --- /dev/null +++ b/pkg/util/api/errors.go @@ -0,0 +1,13 @@ +package api + +import ( + "net/http" + + "github.com/weaveworks/common/httpgrpc" +) + +var ( + ErrEndBeforeStart = httpgrpc.Errorf(http.StatusBadRequest, "%s", "end timestamp must not be before start time") + ErrNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "%s", "zero or negative query resolution step widths are not accepted. Try a positive integer") + ErrStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "%s", "exceeded maximum resolution of 11,000 points per timeseries. Try decreasing the query resolution (?step=XX)") +) diff --git a/pkg/util/api/parse.go b/pkg/util/api/parse.go new file mode 100644 index 00000000000..e059c95fd03 --- /dev/null +++ b/pkg/util/api/parse.go @@ -0,0 +1,161 @@ +package api + +import ( + "fmt" + "math" + "net/http" + "strconv" + "strings" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" + "github.com/weaveworks/common/httpgrpc" + + "github.com/cortexproject/cortex/pkg/util" +) + +var ( + // MinTime is the default timestamp used for the start of optional time ranges. + // Exposed to let downstream projects reference it. + // + // Historical note: This should just be time.Unix(math.MinInt64/1000, 0).UTC(), + // but it was set to a higher value in the past due to a misunderstanding. + // The value is still low enough for practical purposes, so we don't want + // to change it now, avoiding confusion for importers of this variable. + MinTime = time.Unix(math.MinInt64/1000+62135596801, 0).UTC() + + // MaxTime is the default timestamp used for the end of optional time ranges. + // Exposed to let downstream projects to reference it. + // + // Historical note: This should just be time.Unix(math.MaxInt64/1000, 0).UTC(), + // but it was set to a lower value in the past due to a misunderstanding. + // The value is still high enough for practical purposes, so we don't want + // to change it now, avoiding confusion for importers of this variable. + MaxTime = time.Unix(math.MaxInt64/1000-62135596801, 999999999).UTC() + + minTimeFormatted = MinTime.Format(time.RFC3339Nano) + maxTimeFormatted = MaxTime.Format(time.RFC3339Nano) +) + +func ExtractQueryOpts(r *http.Request) (promql.QueryOpts, error) { + var duration time.Duration + + if strDuration := r.FormValue("lookback_delta"); strDuration != "" { + parsedDuration, err := ParseDuration(strDuration) + if err != nil { + return nil, fmt.Errorf("error parsing lookback delta duration: %w", err) + } + duration = parsedDuration + } + + return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil +} + +func ParseTime(s string) (time.Time, error) { + if t, err := strconv.ParseFloat(s, 64); err == nil { + s, ns := math.Modf(t) + ns = math.Round(ns*1000) / 1000 + return time.Unix(int64(s), int64(ns*float64(time.Second))).UTC(), nil + } + if t, err := time.Parse(time.RFC3339Nano, s); err == nil { + return t, nil + } + + // Stdlib's time parser can only handle 4 digit years. As a workaround until + // that is fixed we want to at least support our own boundary times. + // Context: https://github.com/prometheus/client_golang/issues/614 + // Upstream issue: https://github.com/golang/go/issues/20555 + switch s { + case minTimeFormatted: + return MinTime, nil + case maxTimeFormatted: + return MaxTime, nil + } + return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s) +} + +func ParseTimeMillis(s string) (int64, error) { + t, err := ParseTime(s) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return util.TimeToMillis(t), nil +} + +func ParseDuration(s string) (time.Duration, error) { + if d, err := strconv.ParseFloat(s, 64); err == nil { + ts := d * float64(time.Second) + if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { + return 0, fmt.Errorf("cannot parse %q to a valid duration. It overflows int64", s) + } + return time.Duration(ts), nil + } + if d, err := model.ParseDuration(s); err == nil { + return time.Duration(d), nil + } + return 0, fmt.Errorf("cannot parse %q to a valid duration", s) +} + +func ParseDurationMillis(s string) (int64, error) { + d, err := ParseDuration(s) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return int64(d / (time.Millisecond / time.Nanosecond)), nil +} + +func ParseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) { + val := r.FormValue(paramName) + if val == "" { + val = strconv.FormatInt(defaultValue.Unix(), 10) + } + result, err := ParseTime(val) + if err != nil { + return time.Time{}, fmt.Errorf("invalid time value for '%s': %w", paramName, err) + } + return result, nil +} + +func ParseTimeParamMillis(r *http.Request, paramName string, defaultValue time.Time) (int64, error) { + t, err := ParseTimeParam(r, paramName, defaultValue) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return util.TimeToMillis(t), nil +} + +// FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process. +// This takes into account offsets, @ modifiers, and range selectors. +// If the expression does not select series, then FindMinMaxTime returns (0, 0). +func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Duration, now time.Time) (int64, int64) { + isQuery := strings.HasSuffix(r.URL.Path, "/query") + + var startTime, endTime int64 + if isQuery { + if t, err := ParseTimeParamMillis(r, "time", now); err == nil { + startTime = t + endTime = t + } + } else { + if st, err := ParseTimeMillis(r.FormValue("start")); err == nil { + if et, err := ParseTimeMillis(r.FormValue("end")); err == nil { + startTime = st + endTime = et + } + } + } + + es := &parser.EvalStmt{ + Expr: expr, + Start: util.TimeFromMillis(startTime), + End: util.TimeFromMillis(endTime), + LookbackDelta: lookbackDelta, + } + + return promql.FindMinMaxTime(es) +} diff --git a/pkg/util/api/parse_test.go b/pkg/util/api/parse_test.go new file mode 100644 index 00000000000..46b9ecf9138 --- /dev/null +++ b/pkg/util/api/parse_test.go @@ -0,0 +1,118 @@ +package api + +import ( + "bytes" + "net/http" + "strconv" + "testing" + "time" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/cortexproject/cortex/pkg/util" +) + +func TestFindMinMaxTime(t *testing.T) { + now := time.Now() + + type testCase struct { + query string + lookbackDelta time.Duration + queryStartTime time.Time + queryEndTime time.Time + expectedMinTime time.Time + expectedMaxTime time.Time + } + + tests := map[string]testCase{ + "should consider min and max of the query param": { + query: "up", + queryStartTime: now.Add(-1 * time.Hour), + queryEndTime: now, + expectedMinTime: now.Add(-1 * time.Hour), + expectedMaxTime: now, + }, + "should consider min and max of inner queries": { + query: "go_gc_duration_seconds_count[2h] offset 30m + go_gc_duration_seconds_count[3h] offset 1h", + queryStartTime: now.Add(-1 * time.Hour), + queryEndTime: now, + expectedMinTime: now.Add(-5 * time.Hour), + expectedMaxTime: now.Add(-30 * time.Minute), + }, + "should consider lookback delta": { + query: "up", + lookbackDelta: 1 * time.Hour, + queryStartTime: now.Add(-1 * time.Hour), + queryEndTime: now, + expectedMinTime: now.Add(-2 * time.Hour), + expectedMaxTime: now, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + expr, _ := parser.ParseExpr(testData.query) + + url := "/query_range?query=" + testData.query + + "&start=" + strconv.FormatInt(testData.queryStartTime.Truncate(time.Minute).Unix(), 10) + + "&end=" + strconv.FormatInt(testData.queryEndTime.Truncate(time.Minute).Unix(), 10) + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) + + minTime, maxTime := FindMinMaxTime(req, expr, testData.lookbackDelta, now) + assert.Equal(t, testData.expectedMinTime.Truncate(time.Minute).UnixMilli()+1, minTime) // refer to https://github.com/prometheus/prometheus/issues/13213 for the reason +1 + assert.Equal(t, testData.expectedMaxTime.Truncate(time.Minute).UnixMilli(), maxTime) + }) + } +} + +func TestParseTime(t *testing.T) { + var tests = []struct { + input string + fail bool + result time.Time + }{ + { + input: "", + fail: true, + }, { + input: "abc", + fail: true, + }, { + input: "30s", + fail: true, + }, { + input: "123", + result: time.Unix(123, 0), + }, { + input: "123.123", + result: time.Unix(123, 123000000), + }, { + input: "2015-06-03T13:21:58.555Z", + result: time.Unix(1433337718, 555*time.Millisecond.Nanoseconds()), + }, { + input: "2015-06-03T14:21:58.555+01:00", + result: time.Unix(1433337718, 555*time.Millisecond.Nanoseconds()), + }, { + // Test nanosecond rounding. + input: "2015-06-03T13:21:58.56789Z", + result: time.Unix(1433337718, 567*1e6), + }, { + // Test float rounding. + input: "1543578564.705", + result: time.Unix(1543578564, 705*1e6), + }, + } + + for _, test := range tests { + ts, err := ParseTimeMillis(test.input) + if test.fail { + require.Error(t, err) + continue + } + + require.NoError(t, err) + assert.Equal(t, util.TimeToMillis(test.result), ts) + } +} diff --git a/pkg/util/api/response.go b/pkg/util/api/response.go index c58baf60b95..bb75a3c5f91 100644 --- a/pkg/util/api/response.go +++ b/pkg/util/api/response.go @@ -2,6 +2,7 @@ package api import ( "encoding/json" + "fmt" "net/http" "github.com/go-kit/log" @@ -9,6 +10,7 @@ import ( v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/weaveworks/common/httpgrpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) const ( @@ -85,3 +87,11 @@ func RespondError(logger log.Logger, w http.ResponseWriter, errorType v1.ErrorTy level.Error(logger).Log("msg", "error writing response", "bytesWritten", n, "err", err) } } + +func DecorateWithParamName(err error, field string) error { + errTmpl := "invalid parameter %q; %v" + if status, ok := status.FromError(err); ok { + return httpgrpc.Errorf(int(status.Code()), errTmpl, field, status.Message()) + } + return fmt.Errorf(errTmpl, field, err) +} diff --git a/pkg/util/time.go b/pkg/util/time.go index 5e52493292c..ac7c49c29ba 100644 --- a/pkg/util/time.go +++ b/pkg/util/time.go @@ -4,16 +4,11 @@ import ( "context" "math" "math/rand" - "net/http" "strconv" - "strings" "time" - "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - "github.com/weaveworks/common/httpgrpc" ) const ( @@ -43,33 +38,6 @@ func FormatMillisToSeconds(ms int64) string { return strconv.FormatFloat(float64(ms)/float64(1000), 'f', -1, 64) } -// ParseTime parses the string into an int64, milliseconds since epoch. -func ParseTime(s string) (int64, error) { - if t, err := strconv.ParseFloat(s, 64); err == nil { - s, ns := math.Modf(t) - ns = math.Round(ns*1000) / 1000 - tm := time.Unix(int64(s), int64(ns*float64(time.Second))) - return TimeToMillis(tm), nil - } - if t, err := time.Parse(time.RFC3339Nano, s); err == nil { - return TimeToMillis(t), nil - } - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid timestamp", s) -} - -// ParseTimeParam parses the time request parameter into an int64, milliseconds since epoch. -func ParseTimeParam(r *http.Request, paramName string, defaultValue int64) (int64, error) { - val := r.FormValue(paramName) - if val == "" { - val = strconv.FormatInt(defaultValue, 10) - } - result, err := ParseTime(val) - if err != nil { - return 0, errors.Wrapf(err, "Invalid time value for '%s'", paramName) - } - return result, nil -} - // DurationWithJitter returns random duration from "input - input*variance" to "input + input*variance" interval. func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration { // No duration? No jitter. @@ -120,37 +88,6 @@ func NewDisableableTicker(interval time.Duration) (func(), <-chan time.Time) { return func() { tick.Stop() }, tick.C } -// FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process. -// This takes into account offsets, @ modifiers, and range selectors. -// If the expression does not select series, then FindMinMaxTime returns (0, 0). -func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Duration, now time.Time) (int64, int64) { - isQuery := strings.HasSuffix(r.URL.Path, "/query") - - var startTime, endTime int64 - if isQuery { - if t, err := ParseTimeParam(r, "time", now.UnixMilli()); err == nil { - startTime = t - endTime = t - } - } else { - if st, err := ParseTime(r.FormValue("start")); err == nil { - if et, err := ParseTime(r.FormValue("end")); err == nil { - startTime = st - endTime = et - } - } - } - - es := &parser.EvalStmt{ - Expr: expr, - Start: TimeFromMillis(startTime), - End: TimeFromMillis(endTime), - LookbackDelta: lookbackDelta, - } - - return promql.FindMinMaxTime(es) -} - // SlotInfoFunc returns the slot number and the total number of slots type SlotInfoFunc func() (int, int) @@ -227,20 +164,6 @@ func (t *SlottedTicker) nextInterval() time.Duration { return time.Until(lastStartTime) + PositiveJitter(slotSize, t.slotJitter) } -func ParseDurationMs(s string) (int64, error) { - if d, err := strconv.ParseFloat(s, 64); err == nil { - ts := d * float64(time.Second/time.Millisecond) - if ts > float64(math.MaxInt64) || ts < float64(math.MinInt64) { - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration. It overflows int64", s) - } - return int64(ts), nil - } - if d, err := model.ParseDuration(s); err == nil { - return int64(d) / int64(time.Millisecond/time.Nanosecond), nil - } - return 0, httpgrpc.Errorf(http.StatusBadRequest, "cannot parse %q to a valid duration", s) -} - func DurationMilliseconds(d time.Duration) int64 { return int64(d / (time.Millisecond / time.Nanosecond)) } diff --git a/pkg/util/time_test.go b/pkg/util/time_test.go index 239c4eb5b0b..2f3aef76d08 100644 --- a/pkg/util/time_test.go +++ b/pkg/util/time_test.go @@ -1,14 +1,10 @@ package util import ( - "bytes" "fmt" - "net/http" - "strconv" "testing" "time" - "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -61,56 +57,6 @@ func TestDurationWithPositiveJitter_ZeroInputDuration(t *testing.T) { assert.Equal(t, time.Duration(0), DurationWithPositiveJitter(time.Duration(0), 0.5)) } -func TestParseTime(t *testing.T) { - var tests = []struct { - input string - fail bool - result time.Time - }{ - { - input: "", - fail: true, - }, { - input: "abc", - fail: true, - }, { - input: "30s", - fail: true, - }, { - input: "123", - result: time.Unix(123, 0), - }, { - input: "123.123", - result: time.Unix(123, 123000000), - }, { - input: "2015-06-03T13:21:58.555Z", - result: time.Unix(1433337718, 555*time.Millisecond.Nanoseconds()), - }, { - input: "2015-06-03T14:21:58.555+01:00", - result: time.Unix(1433337718, 555*time.Millisecond.Nanoseconds()), - }, { - // Test nanosecond rounding. - input: "2015-06-03T13:21:58.56789Z", - result: time.Unix(1433337718, 567*1e6), - }, { - // Test float rounding. - input: "1543578564.705", - result: time.Unix(1543578564, 705*1e6), - }, - } - - for _, test := range tests { - ts, err := ParseTime(test.input) - if test.fail { - require.Error(t, err) - continue - } - - require.NoError(t, err) - assert.Equal(t, TimeToMillis(test.result), ts) - } -} - func TestNewDisableableTicker_Enabled(t *testing.T) { stop, ch := NewDisableableTicker(10 * time.Millisecond) defer stop() @@ -139,59 +85,6 @@ func TestNewDisableableTicker_Disabled(t *testing.T) { } } -func TestFindMinMaxTime(t *testing.T) { - now := time.Now() - - type testCase struct { - query string - lookbackDelta time.Duration - queryStartTime time.Time - queryEndTime time.Time - expectedMinTime time.Time - expectedMaxTime time.Time - } - - tests := map[string]testCase{ - "should consider min and max of the query param": { - query: "up", - queryStartTime: now.Add(-1 * time.Hour), - queryEndTime: now, - expectedMinTime: now.Add(-1 * time.Hour), - expectedMaxTime: now, - }, - "should consider min and max of inner queries": { - query: "go_gc_duration_seconds_count[2h] offset 30m + go_gc_duration_seconds_count[3h] offset 1h", - queryStartTime: now.Add(-1 * time.Hour), - queryEndTime: now, - expectedMinTime: now.Add(-5 * time.Hour), - expectedMaxTime: now.Add(-30 * time.Minute), - }, - "should consider lookback delta": { - query: "up", - lookbackDelta: 1 * time.Hour, - queryStartTime: now.Add(-1 * time.Hour), - queryEndTime: now, - expectedMinTime: now.Add(-2 * time.Hour), - expectedMaxTime: now, - }, - } - - for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - expr, _ := parser.ParseExpr(testData.query) - - url := "/query_range?query=" + testData.query + - "&start=" + strconv.FormatInt(testData.queryStartTime.Truncate(time.Minute).Unix(), 10) + - "&end=" + strconv.FormatInt(testData.queryEndTime.Truncate(time.Minute).Unix(), 10) - req, _ := http.NewRequest(http.MethodPost, url, bytes.NewReader([]byte{})) - - minTime, maxTime := FindMinMaxTime(req, expr, testData.lookbackDelta, now) - assert.Equal(t, testData.expectedMinTime.Truncate(time.Minute).UnixMilli()+1, minTime) // refer to https://github.com/prometheus/prometheus/issues/13213 for the reason +1 - assert.Equal(t, testData.expectedMaxTime.Truncate(time.Minute).UnixMilli(), maxTime) - }) - } -} - func TestSlottedTicker(t *testing.T) { t.Parallel() testCases := map[string]struct { From 065b4fe5d23509a18098874abd75d1be9602b58b Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Tue, 3 Jun 2025 17:22:40 +0900 Subject: [PATCH 2/3] Move ExtractQueryOpts Signed-off-by: SungJin1212 --- pkg/api/query/handler.go | 4 ++-- pkg/api/query/util.go | 17 +++++++++++++++++ pkg/util/api/parse.go | 14 -------------- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/api/query/handler.go b/pkg/api/query/handler.go index 622141ce37b..80beae5e229 100644 --- a/pkg/api/query/handler.go +++ b/pkg/api/query/handler.go @@ -94,7 +94,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } - opts, err := api.ExtractQueryOpts(r) + opts, err := ExtractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } @@ -149,7 +149,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) { defer cancel() } - opts, err := api.ExtractQueryOpts(r) + opts, err := ExtractQueryOpts(r) if err != nil { return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil} } diff --git a/pkg/api/query/util.go b/pkg/api/query/util.go index 0e63d3042a0..ff90ed67740 100644 --- a/pkg/api/query/util.go +++ b/pkg/api/query/util.go @@ -5,9 +5,12 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/util/annotations" + + "github.com/cortexproject/cortex/pkg/util/api" ) const ( @@ -79,3 +82,17 @@ func invalidParamError(err error, parameter string) apiFuncResult { errorBadData, fmt.Errorf("invalid parameter %q: %w", parameter, err), }, nil, nil} } + +func ExtractQueryOpts(r *http.Request) (promql.QueryOpts, error) { + var duration time.Duration + + if strDuration := r.FormValue("lookback_delta"); strDuration != "" { + parsedDuration, err := api.ParseDuration(strDuration) + if err != nil { + return nil, fmt.Errorf("error parsing lookback delta duration: %w", err) + } + duration = parsedDuration + } + + return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil +} diff --git a/pkg/util/api/parse.go b/pkg/util/api/parse.go index e059c95fd03..b2268c56a49 100644 --- a/pkg/util/api/parse.go +++ b/pkg/util/api/parse.go @@ -39,20 +39,6 @@ var ( maxTimeFormatted = MaxTime.Format(time.RFC3339Nano) ) -func ExtractQueryOpts(r *http.Request) (promql.QueryOpts, error) { - var duration time.Duration - - if strDuration := r.FormValue("lookback_delta"); strDuration != "" { - parsedDuration, err := ParseDuration(strDuration) - if err != nil { - return nil, fmt.Errorf("error parsing lookback delta duration: %w", err) - } - duration = parsedDuration - } - - return promql.NewPrometheusQueryOpts(r.FormValue("stats") == "all", duration), nil -} - func ParseTime(s string) (time.Time, error) { if t, err := strconv.ParseFloat(s, 64); err == nil { s, ns := math.Modf(t) From 5320d89fc34203154a07bca1b544b929c6e15ae2 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 5 Jun 2025 16:28:18 +0900 Subject: [PATCH 3/3] Move grpc wrap functions to tripperware Signed-off-by: SungJin1212 --- .../tripperware/instantquery/instant_query.go | 2 +- .../tripperware/query_attribute_matcher.go | 7 ++-- .../tripperware/queryrange/query_range.go | 6 +-- pkg/querier/tripperware/util.go | 30 +++++++++++++ pkg/util/api/parse.go | 42 +++---------------- pkg/util/api/parse_test.go | 6 +-- 6 files changed, 45 insertions(+), 48 deletions(-) diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 025ef7fafd0..62e6a709a35 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -63,7 +63,7 @@ func NewInstantQueryCodec(compressionStr string, defaultCodecTypeStr string) ins func (c instantQueryCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := tripperware.PrometheusRequest{Headers: map[string][]string{}} var err error - result.Time, err = api.ParseTimeParamMillis(r, "time", c.now()) + result.Time, err = tripperware.ParseTimeParamMillis(r, "time", c.now()) if err != nil { return nil, api.DecorateWithParamName(err, "time") } diff --git a/pkg/querier/tripperware/query_attribute_matcher.go b/pkg/querier/tripperware/query_attribute_matcher.go index 89d327f062b..f779ce5cd77 100644 --- a/pkg/querier/tripperware/query_attribute_matcher.go +++ b/pkg/querier/tripperware/query_attribute_matcher.go @@ -172,8 +172,8 @@ func matchAttributeForMetadataQuery(attribute validation.QueryAttribute, op stri } } - startTime, _ := api.ParseTimeMillis(r.FormValue("start")) - endTime, _ := api.ParseTimeMillis(r.FormValue("end")) + startTime, _ := ParseTimeMillis(r.FormValue("start")) + endTime, _ := ParseTimeMillis(r.FormValue("end")) if attribute.TimeWindow.Start != 0 || attribute.TimeWindow.End != 0 { matched = true @@ -236,8 +236,7 @@ func isWithinTimeRangeAttribute(limit validation.TimeRangeLimit, startTime, endT } func isWithinQueryStepLimit(queryStepLimit validation.QueryStepLimit, r *http.Request) bool { - - step, err := api.ParseDurationMillis(r.FormValue("step")) + step, err := ParseDurationMillis(r.FormValue("step")) if err != nil { return false } diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 8633ec9429a..26c32d74714 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -96,12 +96,12 @@ func (c prometheusCodec) MergeResponse(ctx context.Context, req tripperware.Requ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwardHeaders []string) (tripperware.Request, error) { result := tripperware.PrometheusRequest{Headers: map[string][]string{}} var err error - result.Start, err = api.ParseTimeMillis(r.FormValue("start")) + result.Start, err = tripperware.ParseTimeMillis(r.FormValue("start")) if err != nil { return nil, api.DecorateWithParamName(err, "start") } - result.End, err = api.ParseTimeMillis(r.FormValue("end")) + result.End, err = tripperware.ParseTimeMillis(r.FormValue("end")) if err != nil { return nil, api.DecorateWithParamName(err, "end") } @@ -110,7 +110,7 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa return nil, api.ErrEndBeforeStart } - result.Step, err = api.ParseDurationMillis(r.FormValue("step")) + result.Step, err = tripperware.ParseDurationMillis(r.FormValue("step")) if err != nil { return nil, api.DecorateWithParamName(err, "step") } diff --git a/pkg/querier/tripperware/util.go b/pkg/querier/tripperware/util.go index c1e2144b969..c71a9759311 100644 --- a/pkg/querier/tripperware/util.go +++ b/pkg/querier/tripperware/util.go @@ -3,11 +3,14 @@ package tripperware import ( "context" "net/http" + "time" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/api" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -87,3 +90,30 @@ func SetQueryResponseStats(a *PrometheusResponse, queryStats *stats.QueryStats) } } } + +func ParseTimeParamMillis(r *http.Request, paramName string, defaultValue time.Time) (int64, error) { + t, err := api.ParseTimeParam(r, paramName, defaultValue) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return util.TimeToMillis(t), nil +} + +func ParseTimeMillis(s string) (int64, error) { + t, err := api.ParseTime(s) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return util.TimeToMillis(t), nil +} + +func ParseDurationMillis(s string) (int64, error) { + d, err := api.ParseDuration(s) + if err != nil { + return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) + } + + return int64(d / (time.Millisecond / time.Nanosecond)), nil +} diff --git a/pkg/util/api/parse.go b/pkg/util/api/parse.go index b2268c56a49..0dcabf0d080 100644 --- a/pkg/util/api/parse.go +++ b/pkg/util/api/parse.go @@ -11,9 +11,6 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" - "github.com/weaveworks/common/httpgrpc" - - "github.com/cortexproject/cortex/pkg/util" ) var ( @@ -62,15 +59,6 @@ func ParseTime(s string) (time.Time, error) { return time.Time{}, fmt.Errorf("cannot parse %q to a valid timestamp", s) } -func ParseTimeMillis(s string) (int64, error) { - t, err := ParseTime(s) - if err != nil { - return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) - } - - return util.TimeToMillis(t), nil -} - func ParseDuration(s string) (time.Duration, error) { if d, err := strconv.ParseFloat(s, 64); err == nil { ts := d * float64(time.Second) @@ -85,15 +73,6 @@ func ParseDuration(s string) (time.Duration, error) { return 0, fmt.Errorf("cannot parse %q to a valid duration", s) } -func ParseDurationMillis(s string) (int64, error) { - d, err := ParseDuration(s) - if err != nil { - return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) - } - - return int64(d / (time.Millisecond / time.Nanosecond)), nil -} - func ParseTimeParam(r *http.Request, paramName string, defaultValue time.Time) (time.Time, error) { val := r.FormValue(paramName) if val == "" { @@ -106,30 +85,21 @@ func ParseTimeParam(r *http.Request, paramName string, defaultValue time.Time) ( return result, nil } -func ParseTimeParamMillis(r *http.Request, paramName string, defaultValue time.Time) (int64, error) { - t, err := ParseTimeParam(r, paramName, defaultValue) - if err != nil { - return 0, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) - } - - return util.TimeToMillis(t), nil -} - // FindMinMaxTime returns the time in milliseconds of the earliest and latest point in time the statement will try to process. // This takes into account offsets, @ modifiers, and range selectors. // If the expression does not select series, then FindMinMaxTime returns (0, 0). func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Duration, now time.Time) (int64, int64) { isQuery := strings.HasSuffix(r.URL.Path, "/query") - var startTime, endTime int64 + var startTime, endTime time.Time if isQuery { - if t, err := ParseTimeParamMillis(r, "time", now); err == nil { + if t, err := ParseTimeParam(r, "time", now); err == nil { startTime = t endTime = t } } else { - if st, err := ParseTimeMillis(r.FormValue("start")); err == nil { - if et, err := ParseTimeMillis(r.FormValue("end")); err == nil { + if st, err := ParseTime(r.FormValue("start")); err == nil { + if et, err := ParseTime(r.FormValue("end")); err == nil { startTime = st endTime = et } @@ -138,8 +108,8 @@ func FindMinMaxTime(r *http.Request, expr parser.Expr, lookbackDelta time.Durati es := &parser.EvalStmt{ Expr: expr, - Start: util.TimeFromMillis(startTime), - End: util.TimeFromMillis(endTime), + Start: startTime, + End: endTime, LookbackDelta: lookbackDelta, } diff --git a/pkg/util/api/parse_test.go b/pkg/util/api/parse_test.go index 46b9ecf9138..75801e0e718 100644 --- a/pkg/util/api/parse_test.go +++ b/pkg/util/api/parse_test.go @@ -10,8 +10,6 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/util" ) func TestFindMinMaxTime(t *testing.T) { @@ -106,13 +104,13 @@ func TestParseTime(t *testing.T) { } for _, test := range tests { - ts, err := ParseTimeMillis(test.input) + ts, err := ParseTime(test.input) if test.fail { require.Error(t, err) continue } require.NoError(t, err) - assert.Equal(t, util.TimeToMillis(test.result), ts) + assert.Equal(t, test.result.UnixMilli(), ts.UnixMilli()) } }