Skip to content

Commit 9c471e6

Browse files
Implement branch and bound
Signed-off-by: Dimitar Dimitrov <[email protected]>
1 parent d2a262a commit 9c471e6

File tree

4 files changed

+348
-109
lines changed

4 files changed

+348
-109
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
// SPDX-License-Identifier: AGPL-3.0-only
2+
3+
package lookupplan
4+
5+
import (
6+
"container/heap"
7+
"context"
8+
"iter"
9+
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/tsdb/index"
12+
13+
"github.com/grafana/mimir/pkg/storage/sharding"
14+
)
15+
16+
// partialPlan represents a plan where only some predicates have been decided.
17+
// Predicates are decided in order from 0 to len(predicates)-1.
18+
type partialPlan struct {
19+
plan
20+
21+
// lowerBoundCost is the value of LowerBoundCost() cached for efficiency.
22+
lowerBoundCost float64
23+
// numDecidedPredicates tracks how many predicates have been decided (0 to len(predicates)).
24+
// Predicates [0, numDecidedPredicates) have been decided.
25+
numDecidedPredicates int
26+
}
27+
28+
func partialPlanWithLowerBound(p plan, numDecided int) partialPlan {
29+
partial := partialPlan{
30+
plan: p,
31+
numDecidedPredicates: numDecided,
32+
}
33+
partial.lowerBoundCost = partial.LowerBoundCost()
34+
return partial
35+
}
36+
37+
func (p partialPlan) hasAnyIndexPredicate() bool {
38+
for _, useIndex := range p.indexPredicate {
39+
if useIndex {
40+
return true
41+
}
42+
}
43+
return false
44+
}
45+
46+
func (p partialPlan) LowerBoundCost() float64 {
47+
return p.indexLookupCost() + p.intersectionCost() + p.seriesRetrievalCost() + p.filterCost()
48+
}
49+
50+
// indexLookupCost returns the cost of performing index lookups for all predicates that use the index
51+
func (p partialPlan) indexLookupCost() float64 {
52+
cost := 0.0
53+
for i := range p.predicates {
54+
pr, ok := p.virtualPredicate(i)
55+
if !ok {
56+
continue
57+
}
58+
59+
cost += pr.indexLookupCost()
60+
}
61+
return cost
62+
}
63+
64+
// virtualPredicate returns the predicate at idx and whether it's an index predicate.
65+
// For undecided predicates:
66+
// - The first undecided predicate is treated as an index predicate for lower bound calculation
67+
// - All other undecided predicates are treated as scan predicates with minimal cost
68+
// This goal of virtual undecided predicates is to minimize the cost of the whole plan.
69+
func (p partialPlan) virtualPredicate(idx int) (planPredicate, bool) {
70+
if idx < p.numDecidedPredicates {
71+
return p.predicates[idx], p.indexPredicate[idx]
72+
}
73+
74+
virtualPred := p.predicates[idx]
75+
// Very cheap single match cost, but still non-zero so that there is a difference between using index and not using index for a predicate.
76+
virtualPred.singleMatchCost = 1
77+
// Don't assume 0 cardinality because that might make the whole plan have 0 cardinality which is unrealistic.
78+
virtualPred.cardinality = 1
79+
// Don't assume 0 unique label values because that might make the whole plan have 0 cardinality which is unrealistic.
80+
virtualPred.labelNameUniqueVals = 1
81+
// We don't want selectivity of 0 because then the cost of the rest of the predicates might not matter.
82+
virtualPred.selectivity = 1
83+
// Assume extremely cheap index scan cost.
84+
virtualPred.indexScanCost = 1
85+
86+
return virtualPred, idx == p.numDecidedPredicates
87+
}
88+
89+
// intersectionCost returns the cost of intersecting posting lists from multiple index predicates
90+
// This includes retrieving the series' labels from the index.
91+
func (p partialPlan) intersectionCost() float64 {
92+
iteratedPostings := uint64(0)
93+
for i := range p.predicates {
94+
pred, ok := p.virtualPredicate(i)
95+
if !ok {
96+
continue
97+
}
98+
99+
iteratedPostings += pred.cardinality
100+
}
101+
102+
return float64(iteratedPostings) * p.config.RetrievedPostingCost
103+
}
104+
105+
// seriesRetrievalCost returns the cost of retrieving series from the index after intersecting posting lists.
106+
// This includes retrieving the series' labels from the index and checking if the series belongs to the query's shard.
107+
// Realistically we don't retrieve every series because we have the series hash cache, but we ignore that for simplicity.
108+
func (p partialPlan) seriesRetrievalCost() float64 {
109+
return float64(p.NumSelectedPostings()) * p.config.RetrievedSeriesCost
110+
}
111+
112+
// filterCost returns the cost of applying scan predicates to the fetched series.
113+
// The sequence is: intersection → retrieve series → check shard → apply scan matchers.
114+
func (p partialPlan) filterCost() float64 {
115+
cost := 0.0
116+
seriesToFilter := p.numSelectedPostingsInOurShard()
117+
for i := range p.predicates {
118+
// In reality, we will apply all the predicates for each series and stop once one predicate doesn't match.
119+
// But we calculate for the worst case where we have to run all predicates for all series.
120+
pred, ok := p.virtualPredicate(i)
121+
if ok {
122+
continue
123+
}
124+
125+
cost += pred.filterCost(seriesToFilter)
126+
}
127+
return cost
128+
}
129+
130+
func (p partialPlan) numSelectedPostingsInOurShard() uint64 {
131+
return shardedCardinality(p.NumSelectedPostings(), p.shard)
132+
}
133+
134+
func (p partialPlan) NumSelectedPostings() uint64 {
135+
finalSelectivity := 1.0
136+
for i := range p.predicates {
137+
pred, ok := p.virtualPredicate(i)
138+
if !ok {
139+
continue
140+
}
141+
142+
// We use the selectivity across all series instead of the selectivity across label values.
143+
// For example, if {protocol=~.*} matches all values, it doesn't mean it won't reduce the result set after intersection.
144+
//
145+
// We also assume independence between the predicates. This is a simplification.
146+
// For example, the selectivity of {pod=~prometheus.*} doesn't depend on if we have already applied {statefulset=prometheus}.
147+
// While finalSelectivity is neither an upper bound nor a lower bound, assuming independence allows us to come up with cost estimates comparable between plans.
148+
finalSelectivity *= float64(pred.cardinality) / float64(p.totalSeries)
149+
}
150+
return uint64(finalSelectivity * float64(p.totalSeries))
151+
}
152+
153+
// nonShardedCardinality returns an estimate of the total number of series before query sharding is applied.
154+
// This is the base cardinality considering only the selectivity of all predicates.
155+
func (p partialPlan) nonShardedCardinality() uint64 {
156+
finalSelectivity := 1.0
157+
for i := range p.predicates {
158+
pred, _ := p.virtualPredicate(i)
159+
// We use the selectivity across all series instead of the selectivity across label values.
160+
// For example, if {protocol=~.*} matches all values, it could still reduce the result set after intersection.
161+
//
162+
// We also assume independence between the predicates. This is a simplification.
163+
// For example, the selectivity of {pod=~prometheus.*} doesn't depend on if we have already applied {statefulset=prometheus}.
164+
finalSelectivity *= float64(pred.cardinality) / float64(p.totalSeries)
165+
}
166+
return uint64(finalSelectivity * float64(p.totalSeries))
167+
}
168+
169+
// FinalCardinality returns an estimate of the total number of series that this plan would return.
170+
func (p partialPlan) FinalCardinality() uint64 {
171+
return shardedCardinality(p.nonShardedCardinality(), p.shard)
172+
}
173+
174+
// partialPlans implements heap.Interface for a min-heap of partial plans ordered by lower bound.
175+
type partialPlans []partialPlan
176+
177+
func (pq partialPlans) Len() int { return len(pq) }
178+
179+
func (pq partialPlans) Less(i, j int) bool {
180+
return pq[i].lowerBoundCost < pq[j].lowerBoundCost
181+
}
182+
183+
func (pq partialPlans) Swap(i, j int) {
184+
pq[i], pq[j] = pq[j], pq[i]
185+
}
186+
187+
func (pq *partialPlans) Push(x interface{}) {
188+
*pq = append(*pq, x.(partialPlan))
189+
}
190+
191+
func (pq *partialPlans) Pop() interface{} {
192+
old := *pq
193+
n := len(old)
194+
item := old[n-1]
195+
*pq = old[0 : n-1]
196+
return item
197+
}
198+
199+
func (pq partialPlans) Iterator() iter.Seq[plan] {
200+
return func(f func(plan) bool) {
201+
for _, p := range pq {
202+
if !f(p.plan) {
203+
return
204+
}
205+
}
206+
}
207+
}
208+
209+
// generatePlansBranchAndBound uses branch-and-bound to explore the space of possible plans.
210+
// It prunes branches that cannot possibly lead to a better plan than the current best.
211+
func (p CostBasedPlanner) generatePlansBranchAndBound(ctx context.Context, statistics index.Statistics, matchers []*labels.Matcher, pools *costBasedPlannerPools, shard *sharding.ShardSelector) iter.Seq[plan] {
212+
// Initialize priority queue with the root partial plan (all predicates undecided)
213+
prospectPlans := pools.GetPartialPlans(maxPlansForPlanning)
214+
scanOnlyPlan := newScanOnlyPlan(ctx, statistics, p.config, matchers, pools.indexPredicatesPool, shard)
215+
heap.Push(prospectPlans, partialPlanWithLowerBound(scanOnlyPlan, 0))
216+
217+
completePlans := pools.GetPartialPlans(maxPlansForPlanning)
218+
bestCompleteCost := float64(1<<63 - 1) // Start with max float64
219+
numPredicates := len(scanOnlyPlan.predicates)
220+
221+
for i := maxPlansForPlanning; prospectPlans.Len() > 0 && i > 0; i-- {
222+
current := heap.Pop(prospectPlans).(partialPlan)
223+
224+
// Prune: if lower bound is worse than best complete plan, skip this branch
225+
if current.lowerBoundCost >= bestCompleteCost {
226+
continue
227+
}
228+
229+
// Check if this is a complete plan (all predicates decided)
230+
if current.numDecidedPredicates == numPredicates {
231+
if !current.hasAnyIndexPredicate() {
232+
// We only want plans with at least one index predicate here.
233+
// Plans without index predicates will return no postings.
234+
// This means we should also not use scan-only plans for pruning because their low cost is not a cost we can actually achieve.
235+
continue
236+
}
237+
actualCost := current.plan.TotalCost()
238+
current.lowerBoundCost = actualCost
239+
heap.Push(completePlans, current)
240+
241+
// Update best complete cost for pruning
242+
if actualCost < bestCompleteCost {
243+
bestCompleteCost = actualCost
244+
}
245+
continue
246+
}
247+
248+
// Branch: create children by deciding the next undecided predicate
249+
nextPredicateIdx := current.numDecidedPredicates
250+
251+
indexChild := current.plan.UseIndexFor(nextPredicateIdx)
252+
heap.Push(prospectPlans, partialPlanWithLowerBound(indexChild, nextPredicateIdx+1))
253+
heap.Push(prospectPlans, partialPlanWithLowerBound(current.plan, nextPredicateIdx+1))
254+
}
255+
256+
// Fall back to index-only plan to ensure that our code doesn't choose a more expensive plan than the naive plan.
257+
indexOnlyPlan := newIndexOnlyPlan(ctx, statistics, p.config, matchers, pools.indexPredicatesPool, shard)
258+
heap.Push(completePlans, partialPlanWithLowerBound(indexOnlyPlan, numPredicates))
259+
260+
// Push all plans from the smaller heap into the larger one
261+
// We need this because we will need to find a plan with at least one index matcher later,
262+
// and we might not find that in either of the heaps alone.
263+
return mergePlans(completePlans, prospectPlans).Iterator()
264+
}
265+
266+
func mergePlans(completePlans, prospectPlans *partialPlans) *partialPlans {
267+
for prospectPlans.Len() > 0 {
268+
p := heap.Pop(prospectPlans).(partialPlan)
269+
// At this point we'll be choosing the cheapest plan. we shouldn't be considering the lower bound as the cost of the plan.
270+
p.lowerBoundCost = p.plan.TotalCost()
271+
heap.Push(completePlans, p)
272+
}
273+
return completePlans
274+
}

pkg/ingester/lookupplan/plan.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,15 @@ func newScanOnlyPlan(ctx context.Context, stats index.Statistics, config CostCon
5151
return p
5252
}
5353

54+
// TODO dimitarvdimitrov use this
55+
func newIndexOnlyPlan(ctx context.Context, stats index.Statistics, config CostConfig, matchers []*labels.Matcher, predicatesPool *pool.SlabPool[bool], shard *sharding.ShardSelector) plan {
56+
p := newScanOnlyPlan(ctx, stats, config, matchers, predicatesPool, shard)
57+
for i := range p.indexPredicate {
58+
p.indexPredicate[i] = true
59+
}
60+
return p
61+
}
62+
5463
func (p plan) IndexMatchers() []*labels.Matcher {
5564
var matchers []*labels.Matcher
5665
for i, pred := range p.predicates {

0 commit comments

Comments
 (0)