Skip to content

Commit bf4e9c5

Browse files
committed
fix(dispatch): reduce locking contention
Reduce the amount of time spent holding locks in the dispatcher: - doMaintenance() holds RW lock only during deletion - Groups() holds R lock, copies, releases - processAlert() holds R(W) locks only when necessary This results in -70% maintenance overhead or +19518.99% alert processing rate improvement: ``` │ bench-dispatch-main.txt │ bench-dispatch-fix-locking.txt │ │ baseline_alerts/sec │ baseline_alerts/sec vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 1.899M ± 1% 1.919M ± 2% ~ (p=0.063 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 1.934M ± 2% 1.967M ± 1% +1.69% (p=0.015 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 1.926M ± 3% 1.931M ± 4% ~ (p=0.436 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 2.087M ± 10% 2.030M ± 7% ~ (p=0.912 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 1.922M ± 3% 2.118M ± 10% +10.16% (p=0.000 n=10) Dispatch_20k_AggregationGroups_Groups_Impact-12 180.7k ± 42% 2128.6k ± 4% +1077.93% (p=0.000 n=10) Dispatch_50k_AggregationGroups_Groups_Impact-12 57.02k ± 196% 2089.55k ± 2% +3564.69% (p=0.000 n=10) Dispatch_100k_AggregationGroups_Groups_Impact-12 19.61k ± 35% 1899.27k ± 2% +9582.74% (p=0.000 n=10) geomean 524.6k 2.008M +282.84% │ bench-dispatch-main.txt │ bench-dispatch-fix-locking.txt │ │ maintenance_overhead_% │ maintenance_overhead_% vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 17.185 ± 7% 5.672 ± 45% -66.99% (p=0.000 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 36.50 ± 11% 12.56 ± 14% -65.59% (p=0.000 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 55.44 ± 13% 23.11 ± 30% -58.32% (p=0.000 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 125.65 ± 27% 28.98 ± 53% -76.94% (p=0.000 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 172.40 ± 36% 37.76 ± 82% -78.10% (p=0.000 n=10) geomean 59.62 17.83 -70.10% ``` Signed-off-by: Siavash Safi <[email protected]>
1 parent 38370e7 commit bf4e9c5

File tree

4 files changed

+495
-62
lines changed

4 files changed

+495
-62
lines changed

dispatch/dispatch.go

Lines changed: 65 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"log/slog"
2020
"sort"
2121
"sync"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/prometheus/common/model"
@@ -41,8 +42,7 @@ type Dispatcher struct {
4142
timeout func(time.Duration) time.Duration
4243

4344
mtx sync.RWMutex
44-
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
45-
aggrGroupsNum int
45+
aggrGroupsPerRoute routeGroups
4646

4747
done chan struct{}
4848
ctx context.Context
@@ -84,8 +84,10 @@ func (d *Dispatcher) Run() {
8484
d.done = make(chan struct{})
8585

8686
d.mtx.Lock()
87-
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
88-
d.aggrGroupsNum = 0
87+
d.aggrGroupsPerRoute = routeGroups{
88+
groupsNum: &atomic.Int64{},
89+
limits: d.limits,
90+
}
8991
d.metrics.aggrGroups.Set(0)
9092
d.ctx, d.cancel = context.WithCancel(context.Background())
9193
d.mtx.Unlock()
@@ -134,40 +136,58 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
134136
}
135137

136138
func (d *Dispatcher) doMaintenance() {
137-
d.mtx.Lock()
138-
defer d.mtx.Unlock()
139-
for _, groups := range d.aggrGroupsPerRoute {
140-
for _, ag := range groups {
139+
type groupToRemove struct {
140+
route *Route
141+
fp model.Fingerprint
142+
ag *aggrGroup
143+
}
144+
145+
var toRemove []groupToRemove
146+
147+
// First pass: collect groups to remove
148+
d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool {
149+
groups.Range(func(fp model.Fingerprint, ag *aggrGroup) bool {
141150
if ag.empty() {
142-
ag.stop()
143-
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
144-
delete(groups, ag.fingerprint())
145-
d.aggrGroupsNum--
146-
d.metrics.aggrGroups.Dec()
151+
toRemove = append(toRemove, groupToRemove{route, fp, ag})
147152
}
153+
return true
154+
})
155+
return true
156+
})
157+
158+
// Second pass: remove collected groups
159+
for _, item := range toRemove {
160+
item.ag.stop()
161+
d.marker.DeleteByGroupKey(item.ag.routeID, item.ag.GroupKey())
162+
groupsMap := d.aggrGroupsPerRoute.getRoute(item.route)
163+
if groupsMap != nil {
164+
groupsMap.removeGroup(item.fp)
165+
d.metrics.aggrGroups.Dec()
148166
}
149167
}
150168
}
151169

152170
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
153171
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
154-
groups := AlertGroups{}
155-
156-
d.mtx.RLock()
157-
defer d.mtx.RUnlock()
172+
// Snapshot the outer map in routeGroups to
173+
// avoid holding the read lock when dispatcher has 1000s of aggregation groups.
174+
routeGroups := routeGroups{}
175+
d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool {
176+
routeGroups.addRoute(route)
177+
return true
178+
})
158179

159-
// Keep a list of receivers for an alert to prevent checking each alert
160-
// again against all routes. The alert has already matched against this
161-
// route on ingestion.
180+
// TODO: move this processing out of Dispatcher, it does not belong here.
181+
alertGroups := AlertGroups{}
162182
receivers := map[model.Fingerprint][]string{}
163-
164183
now := time.Now()
165-
for route, ags := range d.aggrGroupsPerRoute {
184+
routeGroups.Range(func(route *Route, _ *fingerprintGroups) bool {
166185
if !routeFilter(route) {
167-
continue
186+
return true
168187
}
169188

170-
for _, ag := range ags {
189+
// Read inner fingerprintGroups if necessary.
190+
d.aggrGroupsPerRoute.getRoute(route).Range(func(fp model.Fingerprint, ag *aggrGroup) bool {
171191
receiver := route.RouteOpts.Receiver
172192
alertGroup := &AlertGroup{
173193
Labels: ag.labels,
@@ -197,22 +217,24 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
197217
filteredAlerts = append(filteredAlerts, a)
198218
}
199219
if len(filteredAlerts) == 0 {
200-
continue
220+
return true
201221
}
202222
alertGroup.Alerts = filteredAlerts
203223

204-
groups = append(groups, alertGroup)
205-
}
206-
}
207-
sort.Sort(groups)
208-
for i := range groups {
209-
sort.Sort(groups[i].Alerts)
224+
alertGroups = append(alertGroups, alertGroup)
225+
return true
226+
})
227+
return true
228+
})
229+
sort.Sort(alertGroups)
230+
for i := range alertGroups {
231+
sort.Sort(alertGroups[i].Alerts)
210232
}
211233
for i := range receivers {
212234
sort.Strings(receivers[i])
213235
}
214236

215-
return groups, receivers
237+
return alertGroups, receivers
216238
}
217239

218240
// Stop the dispatcher.
@@ -236,42 +258,31 @@ func (d *Dispatcher) Stop() {
236258
// and inserts it.
237259
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
238260
groupLabels := getGroupLabels(alert, route)
239-
240261
fp := groupLabels.Fingerprint()
241262

242-
d.mtx.Lock()
243-
defer d.mtx.Unlock()
244-
245-
routeGroups, ok := d.aggrGroupsPerRoute[route]
246-
if !ok {
247-
routeGroups = map[model.Fingerprint]*aggrGroup{}
248-
d.aggrGroupsPerRoute[route] = routeGroups
249-
}
250-
251-
ag, ok := routeGroups[fp]
252-
if ok {
253-
ag.insert(alert)
263+
routeGroups := d.aggrGroupsPerRoute.addRoute(route)
264+
group := routeGroups.getGroup(fp)
265+
if group != nil {
266+
group.insert(alert)
254267
return
255268
}
256269

257-
// If the group does not exist, create it. But check the limit first.
258-
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
270+
// If the group does not exist, create it.
271+
group, count, limit := routeGroups.addGroup(fp, newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger))
272+
if group == nil {
273+
// Rate limited.
259274
d.metrics.aggrGroupLimitReached.Inc()
260-
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
275+
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", count, "limit", limit)
261276
return
262277
}
263-
264-
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
265-
routeGroups[fp] = ag
266-
d.aggrGroupsNum++
267278
d.metrics.aggrGroups.Inc()
268279

269280
// Insert the 1st alert in the group before starting the group's run()
270281
// function, to make sure that when the run() will be executed the 1st
271282
// alert is already there.
272-
ag.insert(alert)
283+
group.insert(alert)
273284

274-
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
285+
go group.run(func(ctx context.Context, alerts ...*types.Alert) bool {
275286
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
276287
if err != nil {
277288
logger := d.logger.With("num_alerts", len(alerts), "err", err)

0 commit comments

Comments
 (0)