Skip to content

Commit ef17bb3

Browse files
committed
logical plan fragmentation implementation
Signed-off-by: rubywtl <[email protected]>
1 parent 821304f commit ef17bb3

File tree

8 files changed

+285
-115
lines changed

8 files changed

+285
-115
lines changed

pkg/distributed_execution/codec_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestUnmarshalWithLogicalPlan(t *testing.T) {
3434

3535
for _, tc := range testCases {
3636
t.Run(tc.name, func(t *testing.T) {
37-
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
37+
plan, err := CreateTestLogicalPlan(tc.query, start, end, step)
3838
require.NoError(t, err)
3939
require.NotNil(t, plan)
4040

pkg/distributed_execution/distributed_optimizer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package distributed_execution
22

33
import (
4-
"github.com/thanos-io/promql-engine/query"
5-
64
"github.com/prometheus/prometheus/util/annotations"
75
"github.com/thanos-io/promql-engine/logicalplan"
6+
"github.com/thanos-io/promql-engine/query"
87
)
98

109
// This is a simplified implementation that only handles binary aggregation cases
@@ -18,6 +17,7 @@ type DistributedOptimizer struct{}
1817
func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) {
1918
warns := annotations.New()
2019

20+
// insert remote nodes
2121
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
2222

2323
if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) {

pkg/distributed_execution/distributed_optimizer_test.go

Lines changed: 1 addition & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ import (
44
"testing"
55
"time"
66

7-
"github.com/prometheus/prometheus/promql/parser"
87
"github.com/stretchr/testify/require"
98
"github.com/thanos-io/promql-engine/logicalplan"
10-
"github.com/thanos-io/promql-engine/query"
119
)
1210

1311
func TestDistributedOptimizer(t *testing.T) {
@@ -58,7 +56,7 @@ func TestDistributedOptimizer(t *testing.T) {
5856

5957
for _, tc := range testCases {
6058
t.Run(tc.name, func(t *testing.T) {
61-
lp, _, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
59+
lp, err := CreateTestLogicalPlan(tc.query, now, now, time.Minute)
6260
require.NoError(t, err)
6361

6462
node := (*lp).Root()
@@ -75,49 +73,3 @@ func TestDistributedOptimizer(t *testing.T) {
7573
})
7674
}
7775
}
78-
79-
func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
80-
if step == 0 {
81-
return start, start
82-
}
83-
return start, end
84-
}
85-
86-
func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) {
87-
88-
start, end = getStartAndEnd(start, end, step)
89-
90-
qOpts := query.Options{
91-
Start: start,
92-
End: end,
93-
Step: step,
94-
StepsBatch: 10,
95-
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
96-
return 0
97-
},
98-
LookbackDelta: 0,
99-
EnablePerStepStats: false,
100-
}
101-
102-
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
103-
if err != nil {
104-
return nil, qOpts, err
105-
}
106-
107-
planOpts := logicalplan.PlanOptions{
108-
DisableDuplicateLabelCheck: false,
109-
}
110-
111-
logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts)
112-
if err != nil {
113-
return nil, qOpts, err
114-
}
115-
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
116-
117-
distributedOptimizer := DistributedOptimizer{}
118-
dOptimizedNode, _ := distributedOptimizer.Optimize(optimizedPlan.Root(), &qOpts)
119-
120-
plan := logicalplan.New(dOptimizedNode, &qOpts, planOpts)
121-
122-
return &plan, qOpts, nil
123-
}

pkg/distributed_execution/plan_fragments/fragmenter.go

Lines changed: 91 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,103 @@
11
package plan_fragments
22

3-
import "github.com/thanos-io/promql-engine/logicalplan"
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/google/uuid"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
9+
"github.com/cortexproject/cortex/pkg/distributed_execution"
10+
)
411

512
// Fragmenter interface
613
type Fragmenter interface {
714
// Fragment function fragments the logical query plan and will always return the fragment in the order of child-to-root
815
// in other words, the order of the fragment in the array will be the order they are being scheduled
9-
Fragment(node logicalplan.Node) ([]Fragment, error)
16+
Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error)
1017
}
1118

12-
type DummyFragmenter struct {
19+
func getNewID() uint64 {
20+
id := uuid.New()
21+
return binary.BigEndian.Uint64(id[:8])
1322
}
1423

15-
func (f *DummyFragmenter) Fragment(node logicalplan.Node) ([]Fragment, error) {
16-
// simple logic without distributed optimizer
17-
return []Fragment{
18-
{
19-
Node: node,
20-
FragmentID: uint64(1),
21-
ChildIDs: []uint64{},
22-
IsRoot: true,
23-
},
24-
}, nil
24+
type PlanFragmenter struct {
25+
}
26+
27+
func (f *PlanFragmenter) Fragment(queryID uint64, node logicalplan.Node) ([]Fragment, error) {
28+
fragments := []Fragment{}
29+
30+
nodeToFragmentID := make(map[*logicalplan.Node]uint64)
31+
nodeToSubtreeFragmentIDs := make(map[*logicalplan.Node][]uint64)
32+
33+
logicalplan.TraverseBottomUp(nil, &node, func(parent, current *logicalplan.Node) bool {
34+
childFragmentIDs := make(map[uint64]bool)
35+
children := (*current).Children()
36+
37+
for _, child := range children {
38+
if subtreeIDs, exists := nodeToSubtreeFragmentIDs[child]; exists {
39+
for _, fragmentID := range subtreeIDs {
40+
childFragmentIDs[fragmentID] = true
41+
}
42+
}
43+
}
44+
45+
childIDs := make([]uint64, 0, len(childFragmentIDs))
46+
for fragmentID := range childFragmentIDs {
47+
childIDs = append(childIDs, fragmentID)
48+
}
49+
50+
if parent == nil { // root fragment
51+
newFragment := Fragment{
52+
Node: *current,
53+
FragmentID: getNewID(),
54+
ChildIDs: childIDs,
55+
IsRoot: true,
56+
}
57+
fragments = append(fragments, newFragment)
58+
59+
// cache subtree fragment IDs for this node
60+
nodeToSubtreeFragmentIDs[current] = childIDs
61+
62+
} else if distributed_execution.RemoteNode == (*current).Type() {
63+
remoteNode := (*current).(*distributed_execution.Remote)
64+
fragmentID := getNewID()
65+
nodeToFragmentID[current] = fragmentID
66+
67+
// Set the fragment key for the remote node
68+
key := distributed_execution.MakeFragmentKey(queryID, fragmentID)
69+
remoteNode.FragmentKey = key
70+
71+
newFragment := Fragment{
72+
Node: remoteNode.Expr,
73+
FragmentID: fragmentID,
74+
ChildIDs: childIDs,
75+
IsRoot: false,
76+
}
77+
78+
fragments = append(fragments, newFragment)
79+
80+
subtreeIDs := append([]uint64{fragmentID})
81+
nodeToSubtreeFragmentIDs[current] = subtreeIDs
82+
} else {
83+
nodeToSubtreeFragmentIDs[current] = childIDs
84+
}
85+
86+
return false
87+
})
88+
89+
if len(fragments) > 0 {
90+
return fragments, nil
91+
}
92+
93+
// for non-query API calls
94+
// --> treat as root fragment and immediately return the result
95+
return []Fragment{{
96+
Node: node,
97+
FragmentID: uint64(0),
98+
ChildIDs: []uint64{},
99+
IsRoot: true,
100+
}}, nil
25101
}
26102

27103
type Fragment struct {
@@ -47,6 +123,6 @@ func (s *Fragment) IsEmpty() bool {
47123
return true
48124
}
49125

50-
func NewDummyFragmenter() Fragmenter {
51-
return &DummyFragmenter{}
126+
func NewPlanFragmenter() Fragmenter {
127+
return &PlanFragmenter{}
52128
}

pkg/distributed_execution/plan_fragments/fragmenter_test.go

Lines changed: 100 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,41 +6,124 @@ import (
66

77
"github.com/stretchr/testify/require"
88

9-
"github.com/cortexproject/cortex/pkg/util/logical_plan"
9+
"github.com/cortexproject/cortex/pkg/distributed_execution"
1010
)
1111

12+
// Tests fragmentation of logical plans, verifying that the fragments contain correct metadata.
13+
// Note: The number of fragments is determined by the distributed optimizer's strategy -
14+
// if the optimizer logic changes, this test will need to be updated accordingly.
1215
func TestFragmenter(t *testing.T) {
1316
type testCase struct {
14-
name string
15-
query string
16-
start time.Time
17-
end time.Time
18-
expectedFragments int
17+
name string
18+
query string
19+
start time.Time
20+
end time.Time
21+
expectedFragmentsCnt int
22+
expectedFragments []Fragment
1923
}
2024

2125
now := time.Now()
22-
23-
// more tests will be added when distributed optimizer and fragmenter are implemented
2426
tests := []testCase{
2527
{
26-
name: "simple logical query plan - no fragmentation",
27-
query: "up",
28-
start: now,
29-
end: now,
30-
expectedFragments: 1,
28+
name: "simple logical query plan - no fragmentation",
29+
query: "up",
30+
start: now,
31+
end: now,
32+
expectedFragmentsCnt: 1,
33+
},
34+
{
35+
name: "binary operation with aggregations",
36+
query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))",
37+
start: now,
38+
end: now,
39+
expectedFragmentsCnt: 3,
40+
},
41+
{
42+
name: "multiple binary operation with aggregations",
43+
query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) + sum(rate(http_requests_total{job=\"cache\"}[5m]))",
44+
start: now,
45+
end: now,
46+
expectedFragmentsCnt: 5,
47+
},
48+
{
49+
name: "multiple binary operation with aggregations",
50+
query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) + sum(rate(http_requests_total{job=\"cache\"}[5m])) + sum(rate(http_requests_total{job=\"db\"}[5m]))",
51+
start: now,
52+
end: now,
53+
expectedFragmentsCnt: 7,
3154
},
3255
}
3356

3457
for _, tc := range tests {
3558
t.Run(tc.name, func(t *testing.T) {
36-
lp, err := logical_plan.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
59+
lp, err := distributed_execution.CreateTestLogicalPlan(tc.query, tc.start, tc.end, 0)
3760
require.NoError(t, err)
3861

39-
fragmenter := NewDummyFragmenter()
40-
res, err := fragmenter.Fragment((*lp).Root())
62+
fragmenter := NewPlanFragmenter()
63+
res, err := fragmenter.Fragment(uint64(1), (*lp).Root())
4164

4265
require.NoError(t, err)
43-
require.Equal(t, tc.expectedFragments, len(res))
66+
67+
// first check the number of fragments
68+
require.Equal(t, tc.expectedFragmentsCnt, len(res))
69+
70+
// check the fragments returned by comparing child IDs, ensuring correct hierarchy
71+
if len(res) == 3 { // 3 fragment cases
72+
// current binary split:
73+
// (due to the design of the distributed optimizer)
74+
// 2
75+
// / \
76+
// 0 1
77+
require.Empty(t, res[0].ChildIDs)
78+
require.Empty(t, res[1].ChildIDs)
79+
require.Equal(t, []uint64{res[0].FragmentID, res[1].FragmentID}, res[2].ChildIDs)
80+
81+
} else if len(res) == 5 {
82+
// current binary split:
83+
// 4
84+
// / \
85+
// 2 3
86+
// / \
87+
// 0 1
88+
require.Empty(t, res[0].ChildIDs)
89+
require.Empty(t, res[1].ChildIDs)
90+
require.Empty(t, res[3].ChildIDs)
91+
92+
require.Containsf(t, res[2].ChildIDs, res[0].FragmentID, "child ID of fragment 0 not found in layer 2")
93+
require.Containsf(t, res[2].ChildIDs, res[1].FragmentID, "child ID of fragment 1 not found in layer 2")
94+
require.Equal(t, len(res[2].ChildIDs), 2) // binary check
95+
96+
require.Containsf(t, res[4].ChildIDs, res[3].FragmentID, "child ID of fragment 3 not found in layer 3")
97+
require.Containsf(t, res[4].ChildIDs, res[2].FragmentID, "child ID of fragment 4 not found in layer 3")
98+
require.Equal(t, len(res[4].ChildIDs), 2) // binary check
99+
100+
} else if len(res) == 7 { // 7 fragment cases
101+
// current binary split:
102+
// 6
103+
// / \
104+
// 4 5
105+
// / \
106+
// 2 3
107+
// / \
108+
// 0 1
109+
110+
require.Empty(t, res[0].ChildIDs)
111+
require.Empty(t, res[1].ChildIDs)
112+
require.Empty(t, res[3].ChildIDs)
113+
require.Empty(t, res[5].ChildIDs)
114+
115+
require.Containsf(t, res[2].ChildIDs, res[0].FragmentID, "child ID of fragment 0 not found in layer 2")
116+
require.Containsf(t, res[2].ChildIDs, res[1].FragmentID, "child ID of fragment 1 not found in layer 2")
117+
require.Equal(t, len(res[2].ChildIDs), 2) // binary check
118+
119+
require.Containsf(t, res[4].ChildIDs, res[3].FragmentID, "child ID of fragment 3 not found in layer 3")
120+
require.Containsf(t, res[4].ChildIDs, res[2].FragmentID, "child ID of fragment 4 not found in layer 3")
121+
require.Equal(t, len(res[4].ChildIDs), 2) // binary check
122+
123+
require.Containsf(t, res[6].ChildIDs, res[4].FragmentID, "child ID of fragment 4 not found in layer 4")
124+
require.Containsf(t, res[6].ChildIDs, res[5].FragmentID, "child ID of fragment 5 not found in layer 4")
125+
require.Equal(t, len(res[6].ChildIDs), 2) // binary check
126+
}
44127
})
45128
}
46129
}

pkg/util/logical_plan/test_logicalplan_utils.go renamed to pkg/distributed_execution/test_logicalplan_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package logical_plan
1+
package distributed_execution
22

33
import (
44
"time"
@@ -44,7 +44,7 @@ func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.
4444
if err != nil {
4545
return nil, err
4646
}
47-
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
47+
optimizedPlan, _ := logicalPlan.Optimize(append(logicalplan.DefaultOptimizers, &DistributedOptimizer{}))
4848

4949
return &optimizedPlan, nil
5050
}

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
125125
connectedFrontends: map[string]*connectedFrontend{},
126126

127127
fragmentTable: fragment_table.NewFragmentTable(2 * time.Minute),
128-
fragmenter: plan_fragments.NewDummyFragmenter(),
128+
fragmenter: plan_fragments.NewPlanFragmenter(),
129129
distributedExecEnabled: distributedExecEnabled,
130130
queryFragmentRegistry: map[queryKey][]uint64{},
131131
}
@@ -351,7 +351,7 @@ func (s *Scheduler) fragmentAndEnqueueRequest(frontendContext context.Context, f
351351
return err
352352
}
353353

354-
fragments, err := s.fragmenter.Fragment(lpNode)
354+
fragments, err := s.fragmenter.Fragment(msg.QueryID, lpNode)
355355
if err != nil {
356356
return err
357357
}

0 commit comments

Comments
 (0)