Skip to content

Commit 95bc22f

Browse files
committed
feat(dispatch): add nested map implementation
Add custom nested map implementation with locks at root and branches. This allows to read/write operation to not always block each other. Signed-off-by: Siavash Safi <[email protected]>
1 parent 066008f commit 95bc22f

File tree

1 file changed

+138
-0
lines changed

1 file changed

+138
-0
lines changed

dispatch/map.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright Prometheus Team
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package dispatch
15+
16+
import (
17+
"sync"
18+
19+
"go.uber.org/atomic"
20+
21+
"github.com/prometheus/common/model"
22+
)
23+
24+
// routeGroups is a map of routes to fingerprintGroups.
25+
// It is a nested map implementation which avoid lock contention:
26+
//
27+
// - outer map is protected by a single mutex
28+
// - inner maps are protected by dedicated mutexes
29+
// - shared atomic counter is used to track the number of nested groups
30+
// - limits are shared between all groups
31+
//
32+
// each branch of the map can hold its own R(W)Lock.
33+
type routeGroups struct {
34+
mu sync.RWMutex
35+
routeGroups map[*Route]*fingerprintGroups
36+
snapshot *routeGroups
37+
groupsNum *atomic.Int64
38+
limits Limits
39+
}
40+
41+
// addRoute adds a new route to the map, initializing the inner maps if needed.
42+
// If the route already exists, it returns the existing fingerprintGroups.
43+
func (rg *routeGroups) addRoute(route *Route) *fingerprintGroups {
44+
rg.mu.Lock()
45+
defer rg.mu.Unlock()
46+
if rg.routeGroups == nil {
47+
rg.routeGroups = make(map[*Route]*fingerprintGroups)
48+
}
49+
if rg.routeGroups[route] == nil {
50+
rg.routeGroups[route] = &fingerprintGroups{
51+
aggrGroups: make(map[model.Fingerprint]*aggrGroup),
52+
groupsNum: rg.groupsNum,
53+
limits: rg.limits,
54+
}
55+
}
56+
return rg.routeGroups[route]
57+
}
58+
59+
// getRoute returns the fingerprintGroups for the given route.
60+
func (rg *routeGroups) getRoute(route *Route) *fingerprintGroups {
61+
rg.mu.RLock()
62+
defer rg.mu.RUnlock()
63+
return rg.routeGroups[route]
64+
}
65+
66+
// Range iterates over the routeGroups.
67+
func (rg *routeGroups) Range(fn func(*Route, *fingerprintGroups) bool) {
68+
rg.mu.RLock()
69+
defer rg.mu.RUnlock()
70+
for route, groups := range rg.routeGroups {
71+
if !fn(route, groups) {
72+
break
73+
}
74+
}
75+
}
76+
77+
// fingerprintGroups is a map of fingerprints to aggregation groups.
78+
// It is protected by a dedicated RW mutex.
79+
// - it inherits the shared atomic counter from the parent routeGroups to track the number of groups
80+
// - it inherits the limits from the parent routeGroups
81+
type fingerprintGroups struct {
82+
mu sync.RWMutex
83+
aggrGroups map[model.Fingerprint]*aggrGroup
84+
groupsNum *atomic.Int64
85+
limits Limits
86+
}
87+
88+
// limitReached checks if the number of groups has reached the limit.
89+
func (fg *fingerprintGroups) limitReached() bool {
90+
if limit := fg.limits.MaxNumberOfAggregationGroups(); limit > 0 && fg.groupsNum.Load() >= int64(limit) {
91+
return true
92+
}
93+
return false
94+
}
95+
96+
// addGroup adds a new aggregation group to the map, initializing the inner maps if needed.
97+
// If the group already exists, it returns the existing aggregation group.
98+
func (fg *fingerprintGroups) addGroup(fp model.Fingerprint, ag *aggrGroup) (group *aggrGroup, count int64, limit int) {
99+
fg.mu.Lock()
100+
defer fg.mu.Unlock()
101+
102+
if fg.aggrGroups == nil {
103+
fg.aggrGroups = make(map[model.Fingerprint]*aggrGroup)
104+
}
105+
// Check if we've reached the rate limit before creating a new group.
106+
if fg.limitReached() {
107+
return nil, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups()
108+
}
109+
fg.aggrGroups[fp] = ag
110+
fg.groupsNum.Add(1)
111+
return ag, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups()
112+
}
113+
114+
// removeGroup removes an aggregation group from the map.
115+
func (fg *fingerprintGroups) removeGroup(fp model.Fingerprint) {
116+
fg.mu.Lock()
117+
defer fg.mu.Unlock()
118+
delete(fg.aggrGroups, fp)
119+
fg.groupsNum.Sub(1)
120+
}
121+
122+
// getGroup returns an aggregation group by fingerprint.
123+
func (fg *fingerprintGroups) getGroup(fp model.Fingerprint) *aggrGroup {
124+
fg.mu.RLock()
125+
defer fg.mu.RUnlock()
126+
return fg.aggrGroups[fp]
127+
}
128+
129+
// Range iterates over the fingerprintGroups.
130+
func (fg *fingerprintGroups) Range(fn func(model.Fingerprint, *aggrGroup) bool) {
131+
fg.mu.RLock()
132+
defer fg.mu.RUnlock()
133+
for fp, ag := range fg.aggrGroups {
134+
if !fn(fp, ag) {
135+
break
136+
}
137+
}
138+
}

0 commit comments

Comments
 (0)