@@ -18,6 +18,7 @@ import (
1818 "errors"
1919 "fmt"
2020 "log/slog"
21+ "maps"
2122 "sort"
2223 "sync"
2324 "time"
@@ -185,19 +186,32 @@ func (d *Dispatcher) run(it provider.AlertIterator) {
185186}
186187
187188func (d * Dispatcher ) doMaintenance () {
189+ d .mtx .RLock ()
190+ empty := make (map [* Route ]map [model.Fingerprint ]* aggrGroup )
191+ for route , groups := range d .aggrGroupsPerRoute {
192+ for fp , ag := range groups {
193+ if ag .empty () {
194+ if empty [route ] == nil {
195+ empty [route ] = make (map [model.Fingerprint ]* aggrGroup )
196+ }
197+ empty [route ][fp ] = ag
198+ }
199+ }
200+ }
201+ d .mtx .RUnlock ()
188202 d .mtx .Lock ()
189- defer d .mtx .Unlock ()
190- for _ , groups := range d .aggrGroupsPerRoute {
191- for _ , ag := range groups {
203+ for route , groups := range empty {
204+ for fp , ag := range groups {
192205 if ag .empty () {
193206 ag .stop ()
194207 d .marker .DeleteByGroupKey (ag .routeID , ag .GroupKey ())
195- delete (groups , ag . fingerprint () )
208+ delete (d . aggrGroupsPerRoute [ route ], fp )
196209 d .aggrGroupsNum --
197210 d .metrics .aggrGroups .Dec ()
198211 }
199212 }
200213 }
214+ d .mtx .Unlock ()
201215}
202216
203217// AlertGroup represents how alerts exist within an aggrGroup.
@@ -224,16 +238,19 @@ func (ag AlertGroups) Len() int { return len(ag) }
224238func (d * Dispatcher ) Groups (routeFilter func (* Route ) bool , alertFilter func (* types.Alert , time.Time ) bool ) (AlertGroups , map [model .Fingerprint ][]string ) {
225239 groups := AlertGroups {}
226240
241+ // Take a snapshot copy of the map to avoid blocking alert processing.
242+ snapshot := make (map [* Route ]map [model.Fingerprint ]* aggrGroup )
227243 d .mtx .RLock ()
228- defer d .mtx .RUnlock ()
244+ maps .Copy (d .aggrGroupsPerRoute , snapshot )
245+ d .mtx .RUnlock ()
229246
230247 // Keep a list of receivers for an alert to prevent checking each alert
231248 // again against all routes. The alert has already matched against this
232249 // route on ingestion.
233250 receivers := map [model.Fingerprint ][]string {}
234251
235252 now := time .Now ()
236- for route , ags := range d . aggrGroupsPerRoute {
253+ for route , ags := range snapshot {
237254 if ! routeFilter (route ) {
238255 continue
239256 }
@@ -316,30 +333,34 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) {
316333 fp := groupLabels .Fingerprint ()
317334
318335 d .mtx .Lock ()
319- defer d .mtx .Unlock ()
320-
321336 routeGroups , ok := d .aggrGroupsPerRoute [route ]
322337 if ! ok {
323338 routeGroups = map [model.Fingerprint ]* aggrGroup {}
324339 d .aggrGroupsPerRoute [route ] = routeGroups
325340 }
326341
327342 ag , ok := routeGroups [fp ]
343+ d .mtx .Unlock ()
328344 if ok {
329345 ag .insert (alert )
330346 return
331347 }
332348
333349 // If the group does not exist, create it. But check the limit first.
334- if limit := d .limits .MaxNumberOfAggregationGroups (); limit > 0 && d .aggrGroupsNum >= limit {
350+ d .mtx .RLock ()
351+ aggrGroupsNum := d .aggrGroupsNum
352+ d .mtx .RUnlock ()
353+ if limit := d .limits .MaxNumberOfAggregationGroups (); limit > 0 && aggrGroupsNum >= limit {
335354 d .metrics .aggrGroupLimitReached .Inc ()
336- d .logger .Error ("Too many aggregation groups, cannot create new group for alert" , "groups" , d . aggrGroupsNum , "limit" , limit , "alert" , alert .Name ())
355+ d .logger .Error ("Too many aggregation groups, cannot create new group for alert" , "groups" , aggrGroupsNum , "limit" , limit , "alert" , alert .Name ())
337356 return
338357 }
339358
340359 ag = newAggrGroup (d .ctx , groupLabels , route , d .timeout , d .logger )
360+ d .mtx .Lock ()
341361 routeGroups [fp ] = ag
342362 d .aggrGroupsNum ++
363+ d .mtx .Unlock ()
343364 d .metrics .aggrGroups .Inc ()
344365
345366 // Insert the 1st alert in the group before starting the group's run()
0 commit comments