diff --git a/pkg/querier/astmapper/astmapper.go b/pkg/querier/astmapper/astmapper.go deleted file mode 100644 index 702efcfeda8..00000000000 --- a/pkg/querier/astmapper/astmapper.go +++ /dev/null @@ -1,187 +0,0 @@ -package astmapper - -import ( - "github.com/pkg/errors" - "github.com/prometheus/prometheus/promql/parser" -) - -// ASTMapper is the exported interface for mapping between multiple AST representations -type ASTMapper interface { - Map(node parser.Node) (parser.Node, error) -} - -// MapperFunc is a function adapter for ASTMapper -type MapperFunc func(node parser.Node) (parser.Node, error) - -// Map applies a mapperfunc as an ASTMapper -func (fn MapperFunc) Map(node parser.Node) (parser.Node, error) { - return fn(node) -} - -// MultiMapper can compose multiple ASTMappers -type MultiMapper struct { - mappers []ASTMapper -} - -// Map implements ASTMapper -func (m *MultiMapper) Map(node parser.Node) (parser.Node, error) { - var result parser.Node = node - var err error - - if len(m.mappers) == 0 { - return nil, errors.New("MultiMapper: No mappers registered") - } - - for _, x := range m.mappers { - result, err = x.Map(result) - if err != nil { - return nil, err - } - } - return result, nil - -} - -// Register adds ASTMappers into a multimapper. -// Since registered functions are applied in the order they're registered, it's advised to register them -// in decreasing priority and only operate on nodes that each function cares about, defaulting to CloneNode. -func (m *MultiMapper) Register(xs ...ASTMapper) { - m.mappers = append(m.mappers, xs...) -} - -// NewMultiMapper instaniates an ASTMapper from multiple ASTMappers -func NewMultiMapper(xs ...ASTMapper) *MultiMapper { - m := &MultiMapper{} - m.Register(xs...) - return m -} - -// CloneNode is a helper function to clone a node. -func CloneNode(node parser.Node) (parser.Node, error) { - return parser.ParseExpr(node.String()) -} - -// NodeMapper either maps a single AST node or returns the unaltered node. -// It also returns a bool to signal that no further recursion is necessary. -// This is helpful because it allows mappers to only implement logic for node types they want to change. -// It makes some mappers trivially easy to implement -type NodeMapper interface { - MapNode(node parser.Node) (mapped parser.Node, finished bool, err error) -} - -// NodeMapperFunc is an adapter for NodeMapper -type NodeMapperFunc func(node parser.Node) (parser.Node, bool, error) - -// MapNode applies a NodeMapperFunc as a NodeMapper -func (f NodeMapperFunc) MapNode(node parser.Node) (parser.Node, bool, error) { - return f(node) -} - -// NewASTNodeMapper creates an ASTMapper from a NodeMapper -func NewASTNodeMapper(mapper NodeMapper) ASTNodeMapper { - return ASTNodeMapper{mapper} -} - -// ASTNodeMapper is an ASTMapper adapter which uses a NodeMapper internally. -type ASTNodeMapper struct { - NodeMapper -} - -// Map implements ASTMapper from a NodeMapper -func (nm ASTNodeMapper) Map(node parser.Node) (parser.Node, error) { - node, fin, err := nm.MapNode(node) - - if err != nil { - return nil, err - } - - if fin { - return node, nil - } - - switch n := node.(type) { - case nil: - // nil handles cases where we check optional fields that are not set - return nil, nil - - case parser.Expressions: - for i, e := range n { - mapped, err := nm.Map(e) - if err != nil { - return nil, err - } - n[i] = mapped.(parser.Expr) - } - return n, nil - - case *parser.AggregateExpr: - expr, err := nm.Map(n.Expr) - if err != nil { - return nil, err - } - n.Expr = expr.(parser.Expr) - return n, nil - - case *parser.BinaryExpr: - lhs, err := nm.Map(n.LHS) - if err != nil { - return nil, err - } - n.LHS = lhs.(parser.Expr) - - rhs, err := nm.Map(n.RHS) - if err != nil { - return nil, err - } - n.RHS = rhs.(parser.Expr) - return n, nil - - case *parser.Call: - for i, e := range n.Args { - mapped, err := nm.Map(e) - if err != nil { - return nil, err - } - n.Args[i] = mapped.(parser.Expr) - } - return n, nil - - case *parser.SubqueryExpr: - mapped, err := nm.Map(n.Expr) - if err != nil { - return nil, err - } - n.Expr = mapped.(parser.Expr) - return n, nil - - case *parser.ParenExpr: - mapped, err := nm.Map(n.Expr) - if err != nil { - return nil, err - } - n.Expr = mapped.(parser.Expr) - return n, nil - - case *parser.UnaryExpr: - mapped, err := nm.Map(n.Expr) - if err != nil { - return nil, err - } - n.Expr = mapped.(parser.Expr) - return n, nil - - case *parser.EvalStmt: - mapped, err := nm.Map(n.Expr) - if err != nil { - return nil, err - } - n.Expr = mapped.(parser.Expr) - return n, nil - - case *parser.NumberLiteral, *parser.StringLiteral, *parser.VectorSelector, *parser.MatrixSelector: - return n, nil - - default: - panic(errors.Errorf("nodeMapper: unhandled node type %T", node)) - } -} diff --git a/pkg/querier/astmapper/astmapper_test.go b/pkg/querier/astmapper/astmapper_test.go deleted file mode 100644 index b98bc6434d9..00000000000 --- a/pkg/querier/astmapper/astmapper_test.go +++ /dev/null @@ -1,110 +0,0 @@ -package astmapper - -import ( - "fmt" - "testing" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" -) - -func TestCloneNode(t *testing.T) { - var testExpr = []struct { - input parser.Expr - expected parser.Expr - }{ - // simple unmodified case - { - &parser.BinaryExpr{ - Op: parser.ADD, - LHS: &parser.NumberLiteral{Val: 1}, - RHS: &parser.NumberLiteral{Val: 1}, - }, - &parser.BinaryExpr{ - Op: parser.ADD, - LHS: &parser.NumberLiteral{Val: 1, PosRange: parser.PositionRange{Start: 0, End: 1}}, - RHS: &parser.NumberLiteral{Val: 1, PosRange: parser.PositionRange{Start: 4, End: 5}}, - }, - }, - { - &parser.AggregateExpr{ - Op: parser.SUM, - Without: true, - Expr: &parser.VectorSelector{ - Name: "some_metric", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"), - }, - }, - Grouping: []string{"foo"}, - }, - &parser.AggregateExpr{ - Op: parser.SUM, - Without: true, - Expr: &parser.VectorSelector{ - Name: "some_metric", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"), - }, - PosRange: parser.PositionRange{ - Start: 18, - End: 29, - }, - }, - Grouping: []string{"foo"}, - PosRange: parser.PositionRange{ - Start: 0, - End: 30, - }, - }, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - res, err := CloneNode(c.input) - require.NoError(t, err) - require.Equal(t, c.expected, res) - }) - } -} - -func TestCloneNode_String(t *testing.T) { - var testExpr = []struct { - input string - expected string - }{ - { - input: `rate(http_requests_total{cluster="us-central1"}[1m])`, - expected: `rate(http_requests_total{cluster="us-central1"}[1m])`, - }, - { - input: `sum( -sum(rate(http_requests_total{cluster="us-central1"}[1m])) -/ -sum(rate(http_requests_total{cluster="ops-tools1"}[1m])) -)`, - expected: `sum(sum(rate(http_requests_total{cluster="us-central1"}[1m])) / sum(rate(http_requests_total{cluster="ops-tools1"}[1m])))`, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - res, err := CloneNode(expr) - require.Nil(t, err) - require.Equal(t, c.expected, res.String()) - }) - } -} - -func mustLabelMatcher(mt labels.MatchType, name, val string) *labels.Matcher { - m, err := labels.NewMatcher(mt, name, val) - if err != nil { - panic(err) - } - return m -} diff --git a/pkg/querier/astmapper/embedded.go b/pkg/querier/astmapper/embedded.go deleted file mode 100644 index 6e782ead1fe..00000000000 --- a/pkg/querier/astmapper/embedded.go +++ /dev/null @@ -1,82 +0,0 @@ -package astmapper - -import ( - "encoding/json" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" -) - -/* -Design: - -The prometheus api package enforces a (*promql.Engine argument), making it infeasible to do lazy AST -evaluation and substitution from within this package. -This leaves the (storage.Queryable) interface as the remaining target for conducting application level sharding. - -The main idea is to analyze the AST and determine which subtrees can be parallelized. With those in hand, the queries may -be remapped into vector or matrix selectors utilizing a reserved label containing the original query. These may then be parallelized in the storage implementation. -*/ - -const ( - // QueryLabel is a reserved label containing an embedded query - QueryLabel = "__cortex_queries__" - // EmbeddedQueriesMetricName is a reserved label (metric name) denoting an embedded query - EmbeddedQueriesMetricName = "__embedded_queries__" -) - -// EmbeddedQueries is a wrapper type for encoding queries -type EmbeddedQueries struct { - Concat []string `json:"Concat"` -} - -// JSONCodec is a Codec that uses JSON representations of EmbeddedQueries structs -var JSONCodec jsonCodec - -type jsonCodec struct{} - -func (c jsonCodec) Encode(queries []string) (string, error) { - embedded := EmbeddedQueries{ - Concat: queries, - } - b, err := json.Marshal(embedded) - return string(b), err -} - -func (c jsonCodec) Decode(encoded string) (queries []string, err error) { - var embedded EmbeddedQueries - err = json.Unmarshal([]byte(encoded), &embedded) - if err != nil { - return nil, err - } - - return embedded.Concat, nil -} - -// VectorSquash reduces an AST into a single vector query which can be hijacked by a Queryable impl. -// It always uses a VectorSelector as the substitution node. -// This is important because logical/set binops can only be applied against vectors and not matrices. -func VectorSquasher(nodes ...parser.Node) (parser.Expr, error) { - - // concat OR legs - strs := make([]string, 0, len(nodes)) - for _, node := range nodes { - strs = append(strs, node.String()) - } - - encoded, err := JSONCodec.Encode(strs) - if err != nil { - return nil, err - } - - embeddedQuery, err := labels.NewMatcher(labels.MatchEqual, QueryLabel, encoded) - if err != nil { - return nil, err - } - - return &parser.VectorSelector{ - Name: EmbeddedQueriesMetricName, - LabelMatchers: []*labels.Matcher{embeddedQuery}, - }, nil - -} diff --git a/pkg/querier/astmapper/parallel.go b/pkg/querier/astmapper/parallel.go deleted file mode 100644 index d7dc5129776..00000000000 --- a/pkg/querier/astmapper/parallel.go +++ /dev/null @@ -1,108 +0,0 @@ -package astmapper - -import ( - "fmt" - - "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/promql/parser" - - util_log "github.com/cortexproject/cortex/pkg/util/log" -) - -var summableAggregates = map[parser.ItemType]struct{}{ - parser.SUM: {}, - parser.MIN: {}, - parser.MAX: {}, - parser.TOPK: {}, - parser.BOTTOMK: {}, - parser.COUNT: {}, -} - -var nonParallelFuncs = []string{ - "histogram_quantile", - "quantile_over_time", - "absent", -} - -// CanParallelize tests if a subtree is parallelizable. -// A subtree is parallelizable if all of its components are parallelizable. -func CanParallelize(node parser.Node) bool { - switch n := node.(type) { - case nil: - // nil handles cases where we check optional fields that are not set - return true - - case parser.Expressions: - for _, e := range n { - if !CanParallelize(e) { - return false - } - } - return true - - case *parser.AggregateExpr: - _, ok := summableAggregates[n.Op] - if !ok { - return false - } - - // Ensure there are no nested aggregations - nestedAggs, err := Predicate(n.Expr, func(node parser.Node) (bool, error) { - _, ok := node.(*parser.AggregateExpr) - return ok, nil - }) - - return err == nil && !nestedAggs && CanParallelize(n.Expr) - - case *parser.BinaryExpr: - // since binary exprs use each side for merging, they cannot be parallelized - return false - - case *parser.Call: - if n.Func == nil { - return false - } - if !ParallelizableFunc(*n.Func) { - return false - } - - for _, e := range n.Args { - if !CanParallelize(e) { - return false - } - } - return true - - case *parser.SubqueryExpr: - return CanParallelize(n.Expr) - - case *parser.ParenExpr: - return CanParallelize(n.Expr) - - case *parser.UnaryExpr: - // Since these are only currently supported for Scalars, should be parallel-compatible - return true - - case *parser.EvalStmt: - return CanParallelize(n.Expr) - - case *parser.MatrixSelector, *parser.NumberLiteral, *parser.StringLiteral, *parser.VectorSelector: - return true - - default: - level.Error(util_log.Logger).Log("err", fmt.Sprintf("CanParallel: unhandled node type %T", node)) //lint:ignore faillint allow global logger for now - return false - } - -} - -// ParallelizableFunc ensures that a promql function can be part of a parallel query. -func ParallelizableFunc(f parser.Function) bool { - - for _, v := range nonParallelFuncs { - if v == f.Name { - return false - } - } - return true -} diff --git a/pkg/querier/astmapper/parallel_test.go b/pkg/querier/astmapper/parallel_test.go deleted file mode 100644 index aed2b4fb7a6..00000000000 --- a/pkg/querier/astmapper/parallel_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package astmapper - -import ( - "fmt" - "testing" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" -) - -func TestCanParallel(t *testing.T) { - var testExpr = []struct { - input parser.Expr - expected bool - }{ - // simple sum - { - &parser.AggregateExpr{ - Op: parser.SUM, - Without: true, - Expr: &parser.VectorSelector{ - Name: "some_metric", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "some_metric"), - }, - }, - Grouping: []string{"foo"}, - }, - true, - }, - /* - sum( - sum by (foo) bar1{baz=”blip”}[1m]) - / - sum by (foo) bar2{baz=”blip”}[1m])) - ) - */ - { - &parser.AggregateExpr{ - Op: parser.SUM, - Expr: &parser.BinaryExpr{ - Op: parser.DIV, - LHS: &parser.AggregateExpr{ - Op: parser.SUM, - Grouping: []string{"foo"}, - Expr: &parser.VectorSelector{ - Name: "idk", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar1"), - }}, - }, - RHS: &parser.AggregateExpr{ - Op: parser.SUM, - Grouping: []string{"foo"}, - Expr: &parser.VectorSelector{ - Name: "idk", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar2"), - }}, - }, - }, - }, - false, - }, - // sum by (foo) bar1{baz=”blip”}[1m]) ---- this is the first leg of the above - { - &parser.AggregateExpr{ - Op: parser.SUM, - Grouping: []string{"foo"}, - Expr: &parser.VectorSelector{ - Name: "idk", - LabelMatchers: []*labels.Matcher{ - mustLabelMatcher(labels.MatchEqual, string(model.MetricNameLabel), "bar1"), - }}, - }, - true, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - res := CanParallelize(c.input) - require.Equal(t, c.expected, res) - }) - } -} - -func TestCanParallel_String(t *testing.T) { - var testExpr = []struct { - input string - expected bool - }{ - { - `sum by (foo) (rate(bar1{baz="blip"}[1m]))`, - true, - }, - { - `sum by (foo) (histogram_quantile(0.9, rate(http_request_duration_seconds_bucket[10m])))`, - false, - }, - { - `sum by (foo) ( - quantile_over_time(0.9, http_request_duration_seconds_bucket[10m]) - )`, - false, - }, - { - `sum( - count( - count( - foo{bar="baz"} - ) by (a,b) - ) by (instance) - )`, - false, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - res := CanParallelize(expr) - require.Equal(t, c.expected, res) - }) - } -} diff --git a/pkg/querier/astmapper/shard_summer.go b/pkg/querier/astmapper/shard_summer.go deleted file mode 100644 index 681399d09fb..00000000000 --- a/pkg/querier/astmapper/shard_summer.go +++ /dev/null @@ -1,314 +0,0 @@ -package astmapper - -import ( - "fmt" - "regexp" - "strconv" - "strings" - - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" -) - -const ( - // ShardLabel is a reserved label referencing a cortex shard - ShardLabel = "__cortex_shard__" - // ShardLabelFmt is the fmt of the ShardLabel key. - ShardLabelFmt = "%d_of_%d" -) - -var ( - // ShardLabelRE matches a value in ShardLabelFmt - ShardLabelRE = regexp.MustCompile("^[0-9]+_of_[0-9]+$") -) - -type squasher = func(...parser.Node) (parser.Expr, error) - -type shardSummer struct { - shards int - currentShard *int - squash squasher - - // Metrics. - shardedQueries prometheus.Counter -} - -// NewShardSummer instantiates an ASTMapper which will fan out sum queries by shard -func NewShardSummer(shards int, squasher squasher, shardedQueries prometheus.Counter) (ASTMapper, error) { - if squasher == nil { - return nil, errors.Errorf("squasher required and not passed") - } - - return NewASTNodeMapper(&shardSummer{ - shards: shards, - squash: squasher, - currentShard: nil, - shardedQueries: shardedQueries, - }), nil -} - -// CopyWithCurShard clones a shardSummer with a new current shard. -func (summer *shardSummer) CopyWithCurShard(curshard int) *shardSummer { - s := *summer - s.currentShard = &curshard - return &s -} - -// shardSummer expands a query AST by sharding and re-summing when possible -func (summer *shardSummer) MapNode(node parser.Node) (parser.Node, bool, error) { - - switch n := node.(type) { - case *parser.AggregateExpr: - if CanParallelize(n) && n.Op == parser.SUM { - result, err := summer.shardSum(n) - return result, true, err - } - - return n, false, nil - - case *parser.VectorSelector: - if summer.currentShard != nil { - mapped, err := shardVectorSelector(*summer.currentShard, summer.shards, n) - return mapped, true, err - } - return n, true, nil - - case *parser.MatrixSelector: - if summer.currentShard != nil { - mapped, err := shardMatrixSelector(*summer.currentShard, summer.shards, n) - return mapped, true, err - } - return n, true, nil - - default: - return n, false, nil - } -} - -// shardSum contains the logic for how we split/stitch legs of a parallelized sum query -func (summer *shardSummer) shardSum(expr *parser.AggregateExpr) (parser.Node, error) { - - parent, subSums, err := summer.splitSum(expr) - if err != nil { - return nil, err - } - - combinedSums, err := summer.squash(subSums...) - - if err != nil { - return nil, err - } - - parent.Expr = combinedSums - return parent, nil -} - -// splitSum forms the parent and child legs of a parallel query -func (summer *shardSummer) splitSum( - expr *parser.AggregateExpr, -) ( - parent *parser.AggregateExpr, - children []parser.Node, - err error, -) { - parent = &parser.AggregateExpr{ - Op: expr.Op, - Param: expr.Param, - } - var mkChild func(sharded *parser.AggregateExpr) parser.Expr - - if expr.Without { - /* - parallelizing a sum using without(foo) is representable naively as - sum without(foo) ( - sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - or (more optimized): - sum without(__cortex_shard__) ( - sum without(foo) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum without(foo) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - - */ - parent.Grouping = []string{ShardLabel} - parent.Without = true - mkChild = func(sharded *parser.AggregateExpr) parser.Expr { - sharded.Grouping = expr.Grouping - sharded.Without = true - return sharded - } - } else if len(expr.Grouping) > 0 { - /* - parallelizing a sum using by(foo) is representable as - sum by(foo) ( - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - */ - parent.Grouping = expr.Grouping - mkChild = func(sharded *parser.AggregateExpr) parser.Expr { - groups := make([]string, 0, len(expr.Grouping)+1) - groups = append(groups, expr.Grouping...) - groups = append(groups, ShardLabel) - sharded.Grouping = groups - return sharded - } - } else { - /* - parallelizing a non-parameterized sum is representable as - sum( - sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum without(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - or (more optimized): - sum without(__cortex_shard__) ( - sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - */ - parent.Grouping = []string{ShardLabel} - parent.Without = true - mkChild = func(sharded *parser.AggregateExpr) parser.Expr { - sharded.Grouping = []string{ShardLabel} - return sharded - } - } - - // iterate across shardFactor to create children - for i := 0; i < summer.shards; i++ { - cloned, err := CloneNode(expr.Expr) - if err != nil { - return parent, children, err - } - - subSummer := NewASTNodeMapper(summer.CopyWithCurShard(i)) - sharded, err := subSummer.Map(cloned) - if err != nil { - return parent, children, err - } - - subSum := mkChild(&parser.AggregateExpr{ - Op: expr.Op, - Expr: sharded.(parser.Expr), - }) - - children = append(children, - subSum, - ) - } - - summer.recordShards(float64(summer.shards)) - - return parent, children, nil -} - -// ShardSummer is explicitly passed a prometheus.Counter during construction -// in order to prevent duplicate metric registerings (ShardSummers are created per request). -// recordShards prevents calling nil interfaces (commonly used in tests). -func (summer *shardSummer) recordShards(n float64) { - if summer.shardedQueries != nil { - summer.shardedQueries.Add(float64(summer.shards)) - } -} - -func shardVectorSelector(curshard, shards int, selector *parser.VectorSelector) (parser.Node, error) { - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, ShardLabel, fmt.Sprintf(ShardLabelFmt, curshard, shards)) - if err != nil { - return nil, err - } - - return &parser.VectorSelector{ - Name: selector.Name, - Offset: selector.Offset, - LabelMatchers: append( - []*labels.Matcher{shardMatcher}, - selector.LabelMatchers..., - ), - }, nil -} - -func shardMatrixSelector(curshard, shards int, selector *parser.MatrixSelector) (parser.Node, error) { - shardMatcher, err := labels.NewMatcher(labels.MatchEqual, ShardLabel, fmt.Sprintf(ShardLabelFmt, curshard, shards)) - if err != nil { - return nil, err - } - - if vs, ok := selector.VectorSelector.(*parser.VectorSelector); ok { - return &parser.MatrixSelector{ - VectorSelector: &parser.VectorSelector{ - Name: vs.Name, - Offset: vs.Offset, - LabelMatchers: append( - []*labels.Matcher{shardMatcher}, - vs.LabelMatchers..., - ), - PosRange: vs.PosRange, - }, - Range: selector.Range, - EndPos: selector.EndPos, - }, nil - } - - return nil, fmt.Errorf("invalid selector type: %T", selector.VectorSelector) -} - -// ParseShard will extract the shard information encoded in ShardLabelFmt -func ParseShard(input string) (parsed ShardAnnotation, err error) { - if !ShardLabelRE.MatchString(input) { - return parsed, errors.Errorf("Invalid ShardLabel value: [%s]", input) - } - - matches := strings.Split(input, "_") - x, err := strconv.Atoi(matches[0]) - if err != nil { - return parsed, err - } - of, err := strconv.Atoi(matches[2]) - if err != nil { - return parsed, err - } - - if x >= of { - return parsed, errors.Errorf("Shards out of bounds: [%d] >= [%d]", x, of) - } - return ShardAnnotation{ - Shard: x, - Of: of, - }, err -} - -// ShardAnnotation is a convenience struct which holds data from a parsed shard label -type ShardAnnotation struct { - Shard int - Of int -} - -// String encodes a shardAnnotation into a label value -func (shard ShardAnnotation) String() string { - return fmt.Sprintf(ShardLabelFmt, shard.Shard, shard.Of) -} - -// Label generates the ShardAnnotation as a label -func (shard ShardAnnotation) Label() labels.Label { - return labels.Label{ - Name: ShardLabel, - Value: shard.String(), - } -} - -// ShardFromMatchers extracts a ShardAnnotation and the index it was pulled from in the matcher list -func ShardFromMatchers(matchers []*labels.Matcher) (shard *ShardAnnotation, idx int, err error) { - for i, matcher := range matchers { - if matcher.Name == ShardLabel && matcher.Type == labels.MatchEqual { - shard, err := ParseShard(matcher.Value) - if err != nil { - return nil, i, err - } - return &shard, i, nil - } - } - return nil, 0, nil -} diff --git a/pkg/querier/astmapper/shard_summer_test.go b/pkg/querier/astmapper/shard_summer_test.go deleted file mode 100644 index 3d503805ccd..00000000000 --- a/pkg/querier/astmapper/shard_summer_test.go +++ /dev/null @@ -1,269 +0,0 @@ -package astmapper - -import ( - "fmt" - "testing" - - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" -) - -// orSquasher is a custom squasher which mimics the intuitive but less efficient OR'ing of sharded vectors. -// It's helpful for tests because of its intuitive & human readable output. -func orSquasher(nodes ...parser.Node) (parser.Expr, error) { - combined := nodes[0] - for i := 1; i < len(nodes); i++ { - combined = &parser.BinaryExpr{ - Op: parser.LOR, - LHS: combined.(parser.Expr), - RHS: nodes[i].(parser.Expr), - } - } - return combined.(parser.Expr), nil -} - -func TestShardSummer(t *testing.T) { - var testExpr = []struct { - shards int - input string - expected string - }{ - { - shards: 3, - input: `sum(rate(bar1{baz="blip"}[1m]))`, - expected: `sum without(__cortex_shard__) ( - sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or - sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or - sum by(__cortex_shard__) (rate(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m])) - )`, - }, - { - shards: 3, - input: `sum by(foo) (rate(bar1{baz="blip"}[1m]))`, - expected: `sum by(foo) ( - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m])) - )`, - }, - { - shards: 2, - input: `sum( - sum by (foo) (rate(bar1{baz="blip"}[1m])) - / - sum by (foo) (rate(foo{baz="blip"}[1m])) - )`, - expected: `sum( - sum by(foo) ( - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - / - sum by(foo) ( - sum by(foo, __cortex_shard__) (rate(foo{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(foo{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - )`, - }, - // This nested sum example is nonsensical, but should not try to shard nested aggregations. - // Instead it only maps the subAggregation but not the outer one. - { - shards: 2, - input: `sum(sum by(foo) (rate(bar1{baz="blip"}[1m])))`, - expected: `sum( - sum by(foo) ( - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(foo, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - ) - )`, - }, - // without - { - shards: 2, - input: `sum without(foo) (rate(bar1{baz="blip"}[1m]))`, - expected: `sum without(__cortex_shard__) ( - sum without(foo) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum without(foo) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - )`, - }, - // multiple dimensions - { - shards: 2, - input: `sum by(foo, bom) (rate(bar1{baz="blip"}[1m]))`, - expected: `sum by(foo, bom) ( - sum by(foo, bom, __cortex_shard__) (rate(bar1{__cortex_shard__="0_of_2",baz="blip"}[1m])) or - sum by(foo, bom, __cortex_shard__) (rate(bar1{__cortex_shard__="1_of_2",baz="blip"}[1m])) - )`, - }, - // sharding histogram inputs - { - shards: 2, - input: `histogram_quantile(0.9, sum(rate(alertmanager_http_request_duration_seconds_bucket[10m])) by (job, le))`, - expected: `histogram_quantile( - 0.9, - sum by(job, le) ( - sum by(job, le, __cortex_shard__) (rate(alertmanager_http_request_duration_seconds_bucket{__cortex_shard__="0_of_2"}[10m])) or - sum by(job, le, __cortex_shard__) (rate(alertmanager_http_request_duration_seconds_bucket{__cortex_shard__="1_of_2"}[10m])) - ) - )`, - }, - { - // Disallow sharding nested aggregations as they may merge series in a non-associative manner. - shards: 2, - input: `sum(count(foo{}))`, - expected: `sum(count(foo{}))`, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - - summer, err := NewShardSummer(c.shards, orSquasher, nil) - require.Nil(t, err) - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - res, err := summer.Map(expr) - require.Nil(t, err) - - expected, err := parser.ParseExpr(c.expected) - require.Nil(t, err) - - require.Equal(t, expected.String(), res.String()) - }) - } -} - -func TestShardSummerWithEncoding(t *testing.T) { - for i, c := range []struct { - shards int - input string - expected string - }{ - { - shards: 3, - input: `sum(rate(bar1{baz="blip"}[1m]))`, - expected: `sum without(__cortex_shard__) (__embedded_queries__{__cortex_queries__="{\"Concat\":[\"sum by(__cortex_shard__) (rate(bar1{__cortex_shard__=\\\"0_of_3\\\",baz=\\\"blip\\\"}[1m]))\",\"sum by(__cortex_shard__) (rate(bar1{__cortex_shard__=\\\"1_of_3\\\",baz=\\\"blip\\\"}[1m]))\",\"sum by(__cortex_shard__) (rate(bar1{__cortex_shard__=\\\"2_of_3\\\",baz=\\\"blip\\\"}[1m]))\"]}"})`, - }, - } { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - summer, err := NewShardSummer(c.shards, VectorSquasher, nil) - require.Nil(t, err) - expr, err := parser.ParseExpr(c.input) - require.Nil(t, err) - res, err := summer.Map(expr) - require.Nil(t, err) - - expected, err := parser.ParseExpr(c.expected) - require.Nil(t, err) - - require.Equal(t, expected.String(), res.String()) - }) - } -} - -func TestParseShard(t *testing.T) { - var testExpr = []struct { - input string - output ShardAnnotation - err bool - }{ - { - input: "lsdjf", - output: ShardAnnotation{}, - err: true, - }, - { - input: "a_of_3", - output: ShardAnnotation{}, - err: true, - }, - { - input: "3_of_3", - output: ShardAnnotation{}, - err: true, - }, - { - input: "1_of_2", - output: ShardAnnotation{ - Shard: 1, - Of: 2, - }, - }, - } - - for _, c := range testExpr { - t.Run(fmt.Sprint(c.input), func(t *testing.T) { - shard, err := ParseShard(c.input) - if c.err { - require.NotNil(t, err) - } else { - require.Nil(t, err) - require.Equal(t, c.output, shard) - } - }) - } - -} - -func TestShardFromMatchers(t *testing.T) { - var testExpr = []struct { - input []*labels.Matcher - shard *ShardAnnotation - idx int - err bool - }{ - { - input: []*labels.Matcher{ - {}, - { - Name: ShardLabel, - Type: labels.MatchEqual, - Value: ShardAnnotation{ - Shard: 10, - Of: 16, - }.String(), - }, - {}, - }, - shard: &ShardAnnotation{ - Shard: 10, - Of: 16, - }, - idx: 1, - err: false, - }, - { - input: []*labels.Matcher{ - { - Name: ShardLabel, - Type: labels.MatchEqual, - Value: "invalid-fmt", - }, - }, - shard: nil, - idx: 0, - err: true, - }, - { - input: []*labels.Matcher{}, - shard: nil, - idx: 0, - err: false, - }, - } - - for i, c := range testExpr { - t.Run(fmt.Sprint(i), func(t *testing.T) { - shard, idx, err := ShardFromMatchers(c.input) - if c.err { - require.NotNil(t, err) - } else { - require.Nil(t, err) - require.Equal(t, c.shard, shard) - require.Equal(t, c.idx, idx) - } - }) - } - -} diff --git a/pkg/querier/astmapper/subtree_folder.go b/pkg/querier/astmapper/subtree_folder.go deleted file mode 100644 index 4e17ea93097..00000000000 --- a/pkg/querier/astmapper/subtree_folder.go +++ /dev/null @@ -1,91 +0,0 @@ -package astmapper - -import ( - "github.com/prometheus/prometheus/promql/parser" -) - -/* -subtreeFolder is a NodeMapper which embeds an entire parser.Node in an embedded query -if it does not contain any previously embedded queries. This allows the frontend to "zip up" entire -subtrees of an AST that have not already been parallelized. -*/ -type subtreeFolder struct{} - -// NewSubtreeFolder creates a subtreeFolder which can reduce an AST -// to one embedded query if it contains no embedded queries yet -func NewSubtreeFolder() ASTMapper { - return NewASTNodeMapper(&subtreeFolder{}) -} - -// MapNode implements NodeMapper -func (f *subtreeFolder) MapNode(node parser.Node) (parser.Node, bool, error) { - switch n := node.(type) { - // do not attempt to fold number or string leaf nodes - case *parser.NumberLiteral, *parser.StringLiteral: - return n, true, nil - } - - containsEmbedded, err := Predicate(node, predicate(isEmbedded)) - if err != nil { - return nil, true, err - } - - if containsEmbedded { - return node, false, nil - } - - expr, err := VectorSquasher(node) - return expr, true, err -} - -func isEmbedded(node parser.Node) (bool, error) { - switch n := node.(type) { - case *parser.VectorSelector: - if n.Name == EmbeddedQueriesMetricName { - return true, nil - } - - case *parser.MatrixSelector: - return isEmbedded(n.VectorSelector) - } - return false, nil -} - -type predicate = func(parser.Node) (bool, error) - -// Predicate is a helper which uses parser.Walk under the hood determine if any node in a subtree -// returns true for a specified function -func Predicate(node parser.Node, fn predicate) (bool, error) { - v := &visitor{ - fn: fn, - } - - if err := parser.Walk(v, node, nil); err != nil { - return false, err - } - return v.result, nil -} - -type visitor struct { - fn predicate - result bool -} - -// Visit implements parser.Visitor -func (v *visitor) Visit(node parser.Node, path []parser.Node) (parser.Visitor, error) { - // if the visitor has already seen a predicate success, don't overwrite - if v.result { - return nil, nil - } - - var err error - - v.result, err = v.fn(node) - if err != nil { - return nil, err - } - if v.result { - return nil, nil - } - return v, nil -} diff --git a/pkg/querier/astmapper/subtree_folder_test.go b/pkg/querier/astmapper/subtree_folder_test.go deleted file mode 100644 index 19fe4db11d2..00000000000 --- a/pkg/querier/astmapper/subtree_folder_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package astmapper - -import ( - "fmt" - "testing" - - "github.com/pkg/errors" - "github.com/prometheus/prometheus/promql/parser" - "github.com/stretchr/testify/require" -) - -func TestPredicate(t *testing.T) { - for i, tc := range []struct { - input string - fn predicate - expected bool - err bool - }{ - { - input: "selector1{} or selector2{}", - fn: predicate(func(node parser.Node) (bool, error) { - return false, errors.New("some err") - }), - expected: false, - err: true, - }, - { - input: "selector1{} or selector2{}", - fn: predicate(func(node parser.Node) (bool, error) { - return false, nil - }), - expected: false, - err: false, - }, - { - input: "selector1{} or selector2{}", - fn: predicate(func(node parser.Node) (bool, error) { - return true, nil - }), - expected: true, - err: false, - }, - { - input: `sum without(__cortex_shard__) (__embedded_queries__{__cortex_queries__="tstquery"}) or sum(selector)`, - fn: predicate(isEmbedded), - expected: true, - err: false, - }, - } { - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { - expr, err := parser.ParseExpr(tc.input) - require.Nil(t, err) - - res, err := Predicate(expr.(parser.Node), tc.fn) - if tc.err { - require.Error(t, err) - } else { - require.Nil(t, err) - } - - require.Equal(t, tc.expected, res) - }) - } -} - -func TestSubtreeMapper(t *testing.T) { - for i, tc := range []struct { - input string - expected string - }{ - // embed an entire histogram - { - input: "histogram_quantile(0.5, rate(alertmanager_http_request_duration_seconds_bucket[1m]))", - expected: `__embedded_queries__{__cortex_queries__="{\"Concat\":[\"histogram_quantile(0.5, rate(alertmanager_http_request_duration_seconds_bucket[1m]))\"]}"}`, - }, - // embed a binary expression across two functions - { - input: `rate(http_requests_total{cluster="eu-west2"}[5m]) or rate(http_requests_total{cluster="us-central1"}[5m])`, - expected: `__embedded_queries__{__cortex_queries__="{\"Concat\":[\"rate(http_requests_total{cluster=\\\"eu-west2\\\"}[5m]) or rate(http_requests_total{cluster=\\\"us-central1\\\"}[5m])\"]}"}`, - }, - - // the first leg (histogram) hasn't been embedded at any level, so embed that, but ignore the right leg - // which has already been embedded. - { - input: `sum(histogram_quantile(0.5, rate(selector[1m]))) + - sum without(__cortex_shard__) (__embedded_queries__{__cortex_queries__="tstquery"})`, - expected: ` - __embedded_queries__{__cortex_queries__="{\"Concat\":[\"sum(histogram_quantile(0.5, rate(selector[1m])))\"]}"} + - sum without(__cortex_shard__) (__embedded_queries__{__cortex_queries__="tstquery"}) -`, - }, - // should not embed scalars - { - input: `histogram_quantile(0.5, __embedded_queries__{__cortex_queries__="tstquery"})`, - expected: `histogram_quantile(0.5, __embedded_queries__{__cortex_queries__="tstquery"})`, - }, - } { - t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - mapper := NewSubtreeFolder() - - expr, err := parser.ParseExpr(tc.input) - require.Nil(t, err) - res, err := mapper.Map(expr) - require.Nil(t, err) - - expected, err := parser.ParseExpr(tc.expected) - require.Nil(t, err) - - require.Equal(t, expected.String(), res.String()) - - }) - } -} diff --git a/pkg/querier/tripperware/queryrange/promql_test.go b/pkg/querier/tripperware/queryrange/promql_test.go deleted file mode 100644 index 73f450a4acf..00000000000 --- a/pkg/querier/tripperware/queryrange/promql_test.go +++ /dev/null @@ -1,687 +0,0 @@ -package queryrange - -import ( - "context" - "fmt" - "math" - "sort" - "strings" - "testing" - "time" - - "github.com/go-kit/log" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql" - "github.com/prometheus/prometheus/storage" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/querier/astmapper" -) - -var ( - start = time.Unix(1000, 0) - end = start.Add(3 * time.Minute) - step = 30 * time.Second - ctx = context.Background() - engine = promql.NewEngine(promql.EngineOpts{ - Reg: prometheus.DefaultRegisterer, - Logger: log.NewNopLogger(), - Timeout: 1 * time.Hour, - MaxSamples: 10e6, - ActiveQueryTracker: nil, - }) -) - -// This test allows to verify which PromQL expressions can be parallelized. -func Test_PromQL(t *testing.T) { - t.Parallel() - - var tests = []struct { - normalQuery string - shardQuery string - shouldEqual bool - }{ - // Vector can be parallelized but we need to remove the cortex shard label. - // It should be noted that the __cortex_shard__ label is required by the engine - // and therefore should be returned by the storage. - // Range vectors `bar1{baz="blip"}[1m]` are not tested here because it is not supported - // by range queries. - { - `bar1{baz="blip"}`, - `label_replace( - bar1{__cortex_shard__="0_of_3",baz="blip"} or - bar1{__cortex_shard__="1_of_3",baz="blip"} or - bar1{__cortex_shard__="2_of_3",baz="blip"}, - "__cortex_shard__","","","" - )`, - true, - }, - // __cortex_shard__ label is required otherwise the or will keep only the first series. - { - `sum(bar1{baz="blip"})`, - `sum( - sum (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - sum (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - sum (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - false, - }, - { - `sum(bar1{baz="blip"})`, - `sum( - sum without(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - sum without(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - sum without(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `sum by (foo) (bar1{baz="blip"})`, - `sum by (foo) ( - sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - sum by(foo,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `sum by (foo,bar) (bar1{baz="blip"})`, - `sum by (foo,bar)( - sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - sum by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - // since series are unique to a shard, it's safe to sum without shard first, then reaggregate - { - `sum without (foo,bar) (bar1{baz="blip"})`, - `sum without (foo,bar)( - sum without(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - sum without(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - sum without(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `min by (foo,bar) (bar1{baz="blip"})`, - `min by (foo,bar)( - min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - min by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `max by (foo,bar) (bar1{baz="blip"})`, - ` max by (foo,bar)( - max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - max by(foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - // avg generally cant be parallelized - { - `avg(bar1{baz="blip"})`, - `avg( - avg by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - avg by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - avg by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - false, - }, - // stddev can't be parallelized. - { - `stddev(bar1{baz="blip"})`, - ` stddev( - stddev by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - stddev by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - stddev by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - false, - }, - // stdvar can't be parallelized. - { - `stdvar(bar1{baz="blip"})`, - `stdvar( - stdvar by(__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - stdvar by(__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - stdvar by(__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - false, - }, - { - `count(bar1{baz="blip"})`, - `count( - count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `count by (foo,bar) (bar1{baz="blip"})`, - `count by (foo,bar) ( - count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - count by (foo,bar,__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - // different ways to represent count without. - { - `count without (foo) (bar1{baz="blip"})`, - `count without (foo) ( - count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `count without (foo) (bar1{baz="blip"})`, - `sum without (__cortex_shard__) ( - count without (foo) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - count without (foo) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - count without (foo) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `count without (foo, bar) (bar1{baz="blip"})`, - `count without (foo, bar) ( - count without (__cortex_shard__) (bar1{__cortex_shard__="0_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="1_of_3",baz="blip"}) or - count without (__cortex_shard__) (bar1{__cortex_shard__="2_of_3",baz="blip"}) - )`, - true, - }, - { - `topk(2,bar1{baz="blip"})`, - `label_replace( - topk(2, - topk(2,(bar1{__cortex_shard__="0_of_3",baz="blip"})) without(__cortex_shard__) or - topk(2,(bar1{__cortex_shard__="1_of_3",baz="blip"})) without(__cortex_shard__) or - topk(2,(bar1{__cortex_shard__="2_of_3",baz="blip"})) without(__cortex_shard__) - ), - "__cortex_shard__","","","")`, - true, - }, - { - `bottomk(2,bar1{baz="blip"})`, - `label_replace( - bottomk(2, - bottomk(2,(bar1{__cortex_shard__="0_of_3",baz="blip"})) without(__cortex_shard__) or - bottomk(2,(bar1{__cortex_shard__="1_of_3",baz="blip"})) without(__cortex_shard__) or - bottomk(2,(bar1{__cortex_shard__="2_of_3",baz="blip"})) without(__cortex_shard__) - ), - "__cortex_shard__","","","")`, - true, - }, - { - `sum by (foo,bar) (avg_over_time(bar1{baz="blip"}[1m]))`, - `sum by (foo,bar)( - sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or - sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or - sum by(foo,bar,__cortex_shard__) (avg_over_time(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m])) - )`, - true, - }, - { - `sum by (foo,bar) (min_over_time(bar1{baz="blip"}[1m]))`, - `sum by (foo,bar)( - sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="0_of_3",baz="blip"}[1m])) or - sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="1_of_3",baz="blip"}[1m])) or - sum by(foo,bar,__cortex_shard__) (min_over_time(bar1{__cortex_shard__="2_of_3",baz="blip"}[1m])) - )`, - true, - }, - { - // Sub aggregations must avoid non-associative series merging across shards - `sum( - count( - bar1 - ) by (foo,bazz) - )`, - ` - sum without(__cortex_shard__) ( - sum by(__cortex_shard__) ( - count by(foo, bazz) (foo{__cortex_shard__="0_of_2",bar="baz"}) - ) or - sum by(__cortex_shard__) ( - count by(foo, bazz) (foo{__cortex_shard__="1_of_2",bar="baz"}) - ) - ) -`, - false, - }, - { - // Note: this is a speculative optimization that we don't currently include due to mapping complexity. - // Certain sub aggregations may inject __cortex_shard__ for all (by) subgroupings. - // This is the same as the previous test with the exception that the shard label is injected to the count grouping - `sum( - count( - bar1 - ) by (foo,bazz) - )`, - ` - sum without(__cortex_shard__) ( - sum by(__cortex_shard__) ( - count by(foo, bazz, __cortex_shard__) (foo{__cortex_shard__="0_of_2",bar="baz"}) - ) or - sum by(__cortex_shard__) ( - count by(foo, bazz, __cortex_shard__) (foo{__cortex_shard__="1_of_2",bar="baz"}) - ) - ) -`, - true, - }, - { - // Note: this is a speculative optimization that we don't currently include due to mapping complexity - // This example details multiple layers of aggregations. - // Sub aggregations must inject __cortex_shard__ for all (by) subgroupings. - `sum( - count( - count( - bar1 - ) by (foo,bazz) - ) by (bazz) - )`, - ` - sum without(__cortex_shard__) ( - sum by(__cortex_shard__) ( - count by(bazz, __cortex_shard__) ( - count by(foo, bazz, __cortex_shard__) ( - foo{__cortex_shard__="0_of_2", bar="baz"} - ) - ) - ) or - sum by(__cortex_shard__) ( - count by(bazz, __cortex_shard__) ( - count by(foo, bazz, __cortex_shard__) ( - foo{__cortex_shard__="1_of_2", bar="baz"} - ) - ) - ) - ) -`, - true, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.normalQuery, func(t *testing.T) { - - baseQuery, err := engine.NewRangeQuery(shardAwareQueryable, nil, tt.normalQuery, start, end, step) - require.Nil(t, err) - shardQuery, err := engine.NewRangeQuery(shardAwareQueryable, nil, tt.shardQuery, start, end, step) - require.Nil(t, err) - baseResult := baseQuery.Exec(ctx) - shardResult := shardQuery.Exec(ctx) - t.Logf("base: %v\n", baseResult) - t.Logf("shard: %v\n", shardResult) - if tt.shouldEqual { - require.Equal(t, baseResult, shardResult) - return - } - require.NotEqual(t, baseResult, shardResult) - }) - } - -} - -func Test_FunctionParallelism(t *testing.T) { - tpl := `sum((bar1{}))` - shardTpl := `sum( - sum without(__cortex_shard__) ((bar1{__cortex_shard__="0_of_3"})) or - sum without(__cortex_shard__) ((bar1{__cortex_shard__="1_of_3"})) or - sum without(__cortex_shard__) ((bar1{__cortex_shard__="2_of_3"})) - )` - - mkQuery := func(tpl, fn string, testMatrix bool, fArgs []string) (result string) { - result = strings.Replace(tpl, "", fn, -1) - - if testMatrix { - // turn selectors into ranges - result = strings.Replace(result, "}", "}[1m]", -1) - } - - if len(fArgs) > 0 { - args := "," + strings.Join(fArgs, ",") - result = strings.Replace(result, "", args, -1) - } else { - result = strings.Replace(result, "", "", -1) - } - - return result - } - - for _, tc := range []struct { - fn string - fArgs []string - isTestMatrix bool - approximate bool - }{ - { - fn: "abs", - }, - { - fn: "avg_over_time", - isTestMatrix: true, - approximate: true, - }, - { - fn: "ceil", - }, - { - fn: "changes", - isTestMatrix: true, - }, - { - fn: "count_over_time", - isTestMatrix: true, - }, - { - fn: "days_in_month", - }, - { - fn: "day_of_month", - }, - { - fn: "day_of_week", - }, - { - fn: "delta", - isTestMatrix: true, - approximate: true, - }, - { - fn: "deriv", - isTestMatrix: true, - approximate: true, - }, - { - fn: "exp", - approximate: true, - }, - { - fn: "floor", - }, - { - fn: "hour", - }, - { - fn: "idelta", - isTestMatrix: true, - approximate: true, - }, - { - fn: "increase", - isTestMatrix: true, - approximate: true, - }, - { - fn: "irate", - isTestMatrix: true, - approximate: true, - }, - { - fn: "ln", - approximate: true, - }, - { - fn: "log10", - approximate: true, - }, - { - fn: "log2", - approximate: true, - }, - { - fn: "max_over_time", - isTestMatrix: true, - }, - { - fn: "min_over_time", - isTestMatrix: true, - }, - { - fn: "minute", - }, - { - fn: "month", - }, - { - fn: "rate", - isTestMatrix: true, - approximate: true, - }, - { - fn: "resets", - isTestMatrix: true, - }, - { - fn: "sort", - }, - { - fn: "sort_desc", - }, - { - fn: "sqrt", - approximate: true, - }, - { - fn: "stddev_over_time", - isTestMatrix: true, - approximate: true, - }, - { - fn: "stdvar_over_time", - isTestMatrix: true, - approximate: true, - }, - { - fn: "sum_over_time", - isTestMatrix: true, - }, - { - fn: "timestamp", - }, - { - fn: "year", - }, - { - fn: "clamp_max", - fArgs: []string{"5"}, - }, - { - fn: "clamp_min", - fArgs: []string{"5"}, - }, - { - fn: "predict_linear", - isTestMatrix: true, - approximate: true, - fArgs: []string{"1"}, - }, - { - fn: "round", - fArgs: []string{"20"}, - }, - { - fn: "holt_winters", - isTestMatrix: true, - fArgs: []string{"0.5", "0.7"}, - approximate: true, - }, - } { - - t.Run(tc.fn, func(t *testing.T) { - baseQuery, err := engine.NewRangeQuery( - shardAwareQueryable, - nil, - mkQuery(tpl, tc.fn, tc.isTestMatrix, tc.fArgs), - start, - end, - step, - ) - require.Nil(t, err) - shardQuery, err := engine.NewRangeQuery( - shardAwareQueryable, - nil, - mkQuery(shardTpl, tc.fn, tc.isTestMatrix, tc.fArgs), - start, - end, - step, - ) - require.Nil(t, err) - baseResult := baseQuery.Exec(ctx) - shardResult := shardQuery.Exec(ctx) - t.Logf("base: %+v\n", baseResult) - t.Logf("shard: %+v\n", shardResult) - if !tc.approximate { - require.Equal(t, baseResult, shardResult) - } else { - // Some functions yield tiny differences when sharded due to combining floating point calculations. - baseSeries := baseResult.Value.(promql.Matrix)[0] - shardSeries := shardResult.Value.(promql.Matrix)[0] - - require.Equal(t, len(baseSeries.Points), len(shardSeries.Points)) - for i, basePt := range baseSeries.Points { - shardPt := shardSeries.Points[i] - require.Equal(t, basePt.T, shardPt.T) - require.Equal( - t, - math.Round(basePt.V*1e6)/1e6, - math.Round(shardPt.V*1e6)/1e6, - ) - } - - } - }) - } - -} - -var shardAwareQueryable = storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return &testMatrix{ - series: []*promql.StorageSeries{ - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "barr"}}, factor(5)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "bazz"}}, factor(7)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "buzz"}}, factor(12)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bozz"}}, factor(11)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blop"}, {Name: "foo", Value: "buzz"}}, factor(8)), - newSeries(labels.Labels{{Name: "__name__", Value: "bar1"}, {Name: "baz", Value: "blip"}, {Name: "bar", Value: "blap"}, {Name: "foo", Value: "bazz"}}, identity), - }, - }, nil -}) - -type testMatrix struct { - series []*promql.StorageSeries -} - -func (m *testMatrix) Copy() *testMatrix { - cpy := *m - return &cpy -} - -func (m testMatrix) Next() bool { return len(m.series) != 0 } - -func (m *testMatrix) At() storage.Series { - res := m.series[0] - m.series = m.series[1:] - return res -} - -func (m *testMatrix) Err() error { return nil } - -func (m *testMatrix) Warnings() storage.Warnings { return nil } - -func (m *testMatrix) Select(_ bool, selectParams *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - s, _, err := astmapper.ShardFromMatchers(matchers) - if err != nil { - return storage.ErrSeriesSet(err) - } - - if s != nil { - return splitByShard(s.Shard, s.Of, m) - } - - return m.Copy() -} - -func (m *testMatrix) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, nil -} -func (m *testMatrix) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, nil -} -func (m *testMatrix) Close() error { return nil } - -func newSeries(metric labels.Labels, generator func(float64) float64) *promql.StorageSeries { - sort.Sort(metric) - var points []promql.Point - - for ts := start.Add(-step); ts.Unix() <= end.Unix(); ts = ts.Add(step) { - t := ts.Unix() * 1e3 - points = append(points, promql.Point{ - T: t, - V: generator(float64(t)), - }) - } - - return promql.NewStorageSeries(promql.Series{ - Metric: metric, - Points: points, - }) -} - -func identity(t float64) float64 { - return float64(t) -} - -func factor(f float64) func(float64) float64 { - i := 0. - return func(float64) float64 { - i++ - res := i * f - return res - } -} - -// var identity(t int64) float64 { -// return float64(t) -// } - -// splitByShard returns the shard subset of a testMatrix. -// e.g if a testMatrix has 6 series, and we want 3 shard, then each shard will contain -// 2 series. -func splitByShard(shardIndex, shardTotal int, testMatrices *testMatrix) *testMatrix { - res := &testMatrix{} - for i, s := range testMatrices.series { - if i%shardTotal != shardIndex { - continue - } - var points []promql.Point - it := s.Iterator() - for it.Next() { - t, v := it.At() - points = append(points, promql.Point{ - T: t, - V: v, - }) - - } - lbs := s.Labels().Copy() - lbs = append(lbs, labels.Label{Name: "__cortex_shard__", Value: fmt.Sprintf("%d_of_%d", shardIndex, shardTotal)}) - sort.Sort(lbs) - res.series = append(res.series, promql.NewStorageSeries(promql.Series{ - Metric: lbs, - Points: points, - })) - } - return res -} diff --git a/pkg/querier/tripperware/queryrange/test_utils.go b/pkg/querier/tripperware/queryrange/test_utils.go index 4b19b39dad4..6e198baebbc 100644 --- a/pkg/querier/tripperware/queryrange/test_utils.go +++ b/pkg/querier/tripperware/queryrange/test_utils.go @@ -1,17 +1,9 @@ package queryrange import ( - "context" "fmt" - "time" - "github.com/pkg/errors" - "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/storage" - - "github.com/cortexproject/cortex/pkg/querier/astmapper" - "github.com/cortexproject/cortex/pkg/querier/series" ) // genLabels will create a slice of labels where each label has an equal chance to occupy a value from [0,labelBuckets]. It returns a slice of length labelBuckets^len(labelSet) @@ -44,142 +36,3 @@ func genLabels( return result } - -// NewMockShardedQueryable creates a shard-aware in memory queryable. -func NewMockShardedQueryable( - nSamples int, - labelSet []string, - labelBuckets int, - delayPerSeries time.Duration, -) *MockShardedQueryable { - samples := make([]model.SamplePair, 0, nSamples) - for i := 0; i < nSamples; i++ { - samples = append(samples, model.SamplePair{ - Timestamp: model.Time(i * 1000), - Value: model.SampleValue(i), - }) - } - sets := genLabels(labelSet, labelBuckets) - xs := make([]storage.Series, 0, len(sets)) - for _, ls := range sets { - xs = append(xs, series.NewConcreteSeries(ls, samples)) - } - - return &MockShardedQueryable{ - series: xs, - delayPerSeries: delayPerSeries, - } -} - -// MockShardedQueryable is exported to be reused in the querysharding benchmarking -type MockShardedQueryable struct { - series []storage.Series - delayPerSeries time.Duration -} - -// Querier impls storage.Queryable -func (q *MockShardedQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) { - return q, nil -} - -// Select implements storage.Querier interface. -// The bool passed is ignored because the series is always sorted. -func (q *MockShardedQueryable) Select(_ bool, _ *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { - tStart := time.Now() - - shard, _, err := astmapper.ShardFromMatchers(matchers) - if err != nil { - return storage.ErrSeriesSet(err) - } - - var ( - start int - end int - ) - - if shard == nil { - start = 0 - end = len(q.series) - } else { - // return the series range associated with this shard - seriesPerShard := len(q.series) / shard.Of - start = shard.Shard * seriesPerShard - end = start + seriesPerShard - - // if we're clipping an odd # of series, add the final series to the last shard - if end == len(q.series)-1 && len(q.series)%2 == 1 { - end = len(q.series) - } - } - - var name string - for _, m := range matchers { - if m.Type == labels.MatchEqual && m.Name == "__name__" { - name = m.Value - } - } - - results := make([]storage.Series, 0, end-start) - for i := start; i < end; i++ { - results = append(results, &ShardLabelSeries{ - shard: shard, - name: name, - Series: q.series[i], - }) - } - - // loosely enforce the assumption that an operation on 1/nth of the data - // takes 1/nth of the time. - duration := q.delayPerSeries * time.Duration(len(q.series)) - if shard != nil { - duration = duration / time.Duration(shard.Of) - } - - remaining := time.Until(tStart.Add(duration)) - if remaining > 0 { - time.Sleep(remaining) - } - - // sorted - return series.NewConcreteSeriesSet(results) -} - -// ShardLabelSeries allows extending a Series with new labels. This is helpful for adding cortex shard labels -type ShardLabelSeries struct { - shard *astmapper.ShardAnnotation - name string - storage.Series -} - -// Labels impls storage.Series -func (s *ShardLabelSeries) Labels() labels.Labels { - ls := s.Series.Labels() - - if s.name != "" { - ls = append(ls, labels.Label{ - Name: "__name__", - Value: s.name, - }) - } - - if s.shard != nil { - ls = append(ls, s.shard.Label()) - } - - return ls -} - -// LabelValues impls storage.Querier -func (q *MockShardedQueryable) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, errors.Errorf("unimplemented") -} - -// LabelNames returns all the unique label names present in the block in sorted order. -func (q *MockShardedQueryable) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) { - return nil, nil, errors.Errorf("unimplemented") -} - -// Close releases the resources of the Querier. -func (q *MockShardedQueryable) Close() error { - return nil -} diff --git a/pkg/querier/tripperware/queryrange/test_utils_test.go b/pkg/querier/tripperware/queryrange/test_utils_test.go index 075611c016c..fc95edc6c35 100644 --- a/pkg/querier/tripperware/queryrange/test_utils_test.go +++ b/pkg/querier/tripperware/queryrange/test_utils_test.go @@ -7,8 +7,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/querier/astmapper" ) func TestGenLabelsCorrectness(t *testing.T) { @@ -83,53 +81,3 @@ func TestGenLabelsSize(t *testing.T) { ) } } - -func TestNewMockShardedqueryable(t *testing.T) { - for _, tc := range []struct { - shards, nSamples, labelBuckets int - labelSet []string - }{ - { - nSamples: 100, - shards: 1, - labelBuckets: 3, - labelSet: []string{"a", "b", "c"}, - }, - { - nSamples: 0, - shards: 2, - labelBuckets: 3, - labelSet: []string{"a", "b", "c"}, - }, - } { - q := NewMockShardedQueryable(tc.nSamples, tc.labelSet, tc.labelBuckets, 0) - expectedSeries := int(math.Pow(float64(tc.labelBuckets), float64(len(tc.labelSet)))) - - seriesCt := 0 - for i := 0; i < tc.shards; i++ { - - set := q.Select(false, nil, &labels.Matcher{ - Type: labels.MatchEqual, - Name: astmapper.ShardLabel, - Value: astmapper.ShardAnnotation{ - Shard: i, - Of: tc.shards, - }.String(), - }) - - require.Nil(t, set.Err()) - - for set.Next() { - seriesCt++ - iter := set.At().Iterator() - samples := 0 - for iter.Next() { - samples++ - } - require.Equal(t, tc.nSamples, samples) - } - - } - require.Equal(t, expectedSeries, seriesCt) - } -}