Skip to content

Commit 4520036

Browse files
committed
fix(dispatch): reduce locking contention
Reduce the amount of time spent holding locks in the dispatcher by using the new data structures: - doMaintenance() snapshots empty groups and deletes them afterwards - Groups() snapshots routes and queries groups per route on demand - processAlert() only locks the group it is processing an alert for None of the above 3 methods hold any locks directly. This results in -68% maintenance overhead or +12991% alert processing rate improvement: ``` goos: darwin goarch: arm64 pkg: github.com/prometheus/alertmanager/dispatch cpu: Apple M3 Pro │ bench-dispatch-main.txt │ bench-dispatch-nested-map-locks.txt │ │ sec/op │ sec/op vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 1.242µ ± 1% 1.193µ ± 5% -3.91% (p=0.037 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 1.329µ ± 2% 1.188µ ± 3% -10.65% (p=0.000 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 1.437µ ± 3% 1.288µ ± 7% -10.37% (p=0.001 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 1.695µ ± 8% 1.236µ ± 6% -27.06% (p=0.000 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 2.185µ ± 11% 1.286µ ± 11% -41.17% (p=0.000 n=10) Dispatch_20k_AggregationGroups_Groups_Impact-12 189007.708µ ± 6% 1.143µ ± 18% -100.00% (p=0.000 n=10) Dispatch_50k_AggregationGroups_Groups_Impact-12 602.17m ± 69% 28.09m ± 605% -95.33% (p=0.000 n=10) Dispatch_100k_AggregationGroups_Groups_Impact-12 1.543 ± 29% 1.272 ± 96% ~ (p=0.247 n=10) geomean 187.7µ 24.22µ -87.10% │ bench-dispatch-main.txt │ bench-dispatch-nested-map-locks.txt │ │ alerts/sec │ alerts/sec vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 1.616M ± 1% 1.767M ± 3% +9.36% (p=0.000 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 1.412M ± 2% 1.715M ± 1% +21.45% (p=0.000 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 1.235M ± 4% 1.637M ± 4% +32.58% (p=0.002 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 950.9k ± 14% 1614.0k ± 8% +69.73% (p=0.000 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 693.5k ± 18% 1370.7k ± 17% +97.65% (p=0.000 n=10) Dispatch_20k_AggregationGroups_Groups_Impact-12 5.586 ± 14% 2027690.000 ± 5% +36299398.75% (p=0.000 n=10) Dispatch_50k_AggregationGroups_Groups_Impact-12 3.277 ± 40% 742918.500 ± 167% +22667131.12% (p=0.000 n=10) Dispatch_100k_AggregationGroups_Groups_Impact-12 1.424 ± 4255939% 252756.500 ± 73% +17749654.21% (p=0.000 n=10) geomean 9.134k 1.196M +12991.42% │ bench-dispatch-main.txt │ bench-dispatch-nested-map-locks.txt │ │ maintenance_overhead_% │ maintenance_overhead_% vs base │ Dispatch_100k_AggregationGroups_10k_Empty-12 17.185 ± 7% 5.905 ± ? -65.64% (p=0.002 n=10) Dispatch_100k_AggregationGroups_20k_Empty-12 36.50 ± 11% 13.66 ± 26% -62.58% (p=0.000 n=10) Dispatch_100k_AggregationGroups_30k_Empty-12 55.44 ± 13% 17.45 ± 26% -68.52% (p=0.002 n=10) Dispatch_100k_AggregationGroups_40k_Empty-12 125.65 ± 27% 27.03 ± 34% -78.49% (p=0.000 n=10) Dispatch_100k_AggregationGroups_50k_Empty-12 172.40 ± 36% 64.64 ± 60% -62.51% (p=0.000 n=10) geomean 59.62 18.97 -68.17% ``` Signed-off-by: Siavash Safi <[email protected]>
1 parent 689b82a commit 4520036

File tree

3 files changed

+490
-58
lines changed

3 files changed

+490
-58
lines changed

dispatch/dispatch.go

Lines changed: 66 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import (
2121
"sync"
2222
"time"
2323

24+
"go.uber.org/atomic"
25+
2426
"github.com/prometheus/common/model"
2527

2628
"github.com/prometheus/alertmanager/notify"
@@ -41,8 +43,7 @@ type Dispatcher struct {
4143
timeout func(time.Duration) time.Duration
4244

4345
mtx sync.RWMutex
44-
aggrGroupsPerRoute map[*Route]map[model.Fingerprint]*aggrGroup
45-
aggrGroupsNum int
46+
aggrGroupsPerRoute routeGroups
4647

4748
done chan struct{}
4849
ctx context.Context
@@ -84,8 +85,10 @@ func (d *Dispatcher) Run() {
8485
d.done = make(chan struct{})
8586

8687
d.mtx.Lock()
87-
d.aggrGroupsPerRoute = map[*Route]map[model.Fingerprint]*aggrGroup{}
88-
d.aggrGroupsNum = 0
88+
d.aggrGroupsPerRoute = routeGroups{
89+
groupsNum: &atomic.Int64{},
90+
limits: d.limits,
91+
}
8992
d.metrics.aggrGroups.Set(0)
9093
d.ctx, d.cancel = context.WithCancel(context.Background())
9194
d.mtx.Unlock()
@@ -134,40 +137,58 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
134137
}
135138

136139
func (d *Dispatcher) doMaintenance() {
137-
d.mtx.Lock()
138-
defer d.mtx.Unlock()
139-
for _, groups := range d.aggrGroupsPerRoute {
140-
for _, ag := range groups {
140+
type groupToRemove struct {
141+
route *Route
142+
fp model.Fingerprint
143+
ag *aggrGroup
144+
}
145+
146+
var toRemove []groupToRemove
147+
148+
// First pass: collect groups to remove
149+
d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool {
150+
groups.Range(func(fp model.Fingerprint, ag *aggrGroup) bool {
141151
if ag.empty() {
142-
ag.stop()
143-
d.marker.DeleteByGroupKey(ag.routeID, ag.GroupKey())
144-
delete(groups, ag.fingerprint())
145-
d.aggrGroupsNum--
146-
d.metrics.aggrGroups.Dec()
152+
toRemove = append(toRemove, groupToRemove{route, fp, ag})
147153
}
154+
return true
155+
})
156+
return true
157+
})
158+
159+
// Second pass: remove collected groups
160+
for _, item := range toRemove {
161+
item.ag.stop()
162+
d.marker.DeleteByGroupKey(item.ag.routeID, item.ag.GroupKey())
163+
groupsMap := d.aggrGroupsPerRoute.GetRoute(item.route)
164+
if groupsMap != nil {
165+
groupsMap.RemoveGroup(item.fp)
166+
d.metrics.aggrGroups.Dec()
148167
}
149168
}
150169
}
151170

152171
// Groups returns a slice of AlertGroups from the dispatcher's internal state.
153172
func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*types.Alert, time.Time) bool) (AlertGroups, map[model.Fingerprint][]string) {
154-
groups := AlertGroups{}
155-
156-
d.mtx.RLock()
157-
defer d.mtx.RUnlock()
173+
// Snapshot the outer map in routeGroups to
174+
// avoid holding the read lock when dispatcher has 1000s of aggregation groups.
175+
routeGroups := routeGroups{}
176+
d.aggrGroupsPerRoute.Range(func(route *Route, groups *fingerprintGroups) bool {
177+
routeGroups.AddRoute(route)
178+
return true
179+
})
158180

159-
// Keep a list of receivers for an alert to prevent checking each alert
160-
// again against all routes. The alert has already matched against this
161-
// route on ingestion.
181+
// TODO: move this processing out of Dispatcher, it does not belong here.
182+
alertGroups := AlertGroups{}
162183
receivers := map[model.Fingerprint][]string{}
163-
164184
now := time.Now()
165-
for route, ags := range d.aggrGroupsPerRoute {
185+
routeGroups.Range(func(route *Route, _ *fingerprintGroups) bool {
166186
if !routeFilter(route) {
167-
continue
187+
return true
168188
}
169189

170-
for _, ag := range ags {
190+
// Read inner fingerprintGroups if necessary.
191+
d.aggrGroupsPerRoute.GetRoute(route).Range(func(fp model.Fingerprint, ag *aggrGroup) bool {
171192
receiver := route.RouteOpts.Receiver
172193
alertGroup := &AlertGroup{
173194
Labels: ag.labels,
@@ -197,22 +218,24 @@ func (d *Dispatcher) Groups(routeFilter func(*Route) bool, alertFilter func(*typ
197218
filteredAlerts = append(filteredAlerts, a)
198219
}
199220
if len(filteredAlerts) == 0 {
200-
continue
221+
return true
201222
}
202223
alertGroup.Alerts = filteredAlerts
203224

204-
groups = append(groups, alertGroup)
205-
}
206-
}
207-
sort.Sort(groups)
208-
for i := range groups {
209-
sort.Sort(groups[i].Alerts)
225+
alertGroups = append(alertGroups, alertGroup)
226+
return true
227+
})
228+
return true
229+
})
230+
sort.Sort(alertGroups)
231+
for i := range alertGroups {
232+
sort.Sort(alertGroups[i].Alerts)
210233
}
211234
for i := range receivers {
212235
sort.Strings(receivers[i])
213236
}
214237

215-
return groups, receivers
238+
return alertGroups, receivers
216239
}
217240

218241
// Stop the dispatcher.
@@ -236,42 +259,31 @@ func (d *Dispatcher) Stop() {
236259
// and inserts it.
237260
func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
238261
groupLabels := getGroupLabels(alert, route)
239-
240262
fp := groupLabels.Fingerprint()
241263

242-
d.mtx.Lock()
243-
defer d.mtx.Unlock()
244-
245-
routeGroups, ok := d.aggrGroupsPerRoute[route]
246-
if !ok {
247-
routeGroups = map[model.Fingerprint]*aggrGroup{}
248-
d.aggrGroupsPerRoute[route] = routeGroups
249-
}
250-
251-
ag, ok := routeGroups[fp]
252-
if ok {
253-
ag.insert(alert)
264+
routeGroups := d.aggrGroupsPerRoute.AddRoute(route)
265+
group := routeGroups.GetGroup(fp)
266+
if group != nil {
267+
group.insert(alert)
254268
return
255269
}
256270

257-
// If the group does not exist, create it. But check the limit first.
258-
if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit {
271+
// If the group does not exist, create it.
272+
group, count, limit := routeGroups.AddGroup(fp, newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger))
273+
if group == nil {
274+
// Rate limited.
259275
d.metrics.aggrGroupLimitReached.Inc()
260-
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name())
276+
d.logger.Error("Too many aggregation groups, cannot create new group for alert", "groups", count, "limit", limit)
261277
return
262278
}
263-
264-
ag = newAggrGroup(d.ctx, groupLabels, route, d.timeout, d.logger)
265-
routeGroups[fp] = ag
266-
d.aggrGroupsNum++
267279
d.metrics.aggrGroups.Inc()
268280

269281
// Insert the 1st alert in the group before starting the group's run()
270282
// function, to make sure that when the run() will be executed the 1st
271283
// alert is already there.
272-
ag.insert(alert)
284+
group.insert(alert)
273285

274-
go ag.run(func(ctx context.Context, alerts ...*types.Alert) bool {
286+
go group.run(func(ctx context.Context, alerts ...*types.Alert) bool {
275287
_, _, err := d.stage.Exec(ctx, d.logger, alerts...)
276288
if err != nil {
277289
logger := d.logger.With("num_alerts", len(alerts), "err", err)

0 commit comments

Comments
 (0)