Skip to content

Commit 36f835d

Browse files
committed
feat(dispatch): add nested map implementation
Add custom nested map implementation with locks at root and branches. This allows to read/write operation to not always block each other. Signed-off-by: Siavash Safi <[email protected]>
1 parent 2815718 commit 36f835d

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed

dispatch/map.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// Copyright Prometheus Team
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package dispatch
15+
16+
import (
17+
"sync"
18+
19+
"go.uber.org/atomic"
20+
21+
"github.com/prometheus/common/model"
22+
)
23+
24+
// routeGroups is a map of routes to fingerprintGroups.
25+
// It is a nested map implementation which avoid lock contention.
26+
// The outer map is protected by a single mutex.
27+
// The inner maps are protected by dedicated mutexes.
28+
// The shared atomic counter is used to track the number of nested groups.
29+
// Limits are shared between all groups.
30+
// // Each branch of the map can hold its own R(W)Lock.
31+
type routeGroups struct {
32+
mu sync.RWMutex
33+
routeGroups map[*Route]*fingerprintGroups
34+
groupsNum *atomic.Int64
35+
limits Limits
36+
}
37+
38+
// AddRoute adds a new route to the map, initializing the inner maps if needed.
39+
// If the route already exists, it returns the existing fingerprintGroups.
40+
func (rg *routeGroups) AddRoute(route *Route) *fingerprintGroups {
41+
rg.mu.Lock()
42+
defer rg.mu.Unlock()
43+
if rg.routeGroups == nil {
44+
rg.routeGroups = make(map[*Route]*fingerprintGroups)
45+
}
46+
if rg.routeGroups[route] == nil {
47+
rg.routeGroups[route] = &fingerprintGroups{
48+
aggrGroups: make(map[model.Fingerprint]*aggrGroup),
49+
groupsNum: rg.groupsNum,
50+
limits: rg.limits,
51+
}
52+
}
53+
return rg.routeGroups[route]
54+
}
55+
56+
// GetRoute returns the fingerprintGroups for the given route.
57+
func (rg *routeGroups) GetRoute(route *Route) *fingerprintGroups {
58+
rg.mu.RLock()
59+
defer rg.mu.RUnlock()
60+
return rg.routeGroups[route]
61+
}
62+
63+
// Range iterates over the routeGroups.
64+
func (rg *routeGroups) Range(fn func(*Route, *fingerprintGroups) bool) {
65+
rg.mu.RLock()
66+
defer rg.mu.RUnlock()
67+
for route, groups := range rg.routeGroups {
68+
if !fn(route, groups) {
69+
break
70+
}
71+
}
72+
}
73+
74+
// fingerprintGroups is a map of fingerprints to aggregation groups.
75+
// It is protected by a dedicated RW mutex.
76+
// It inherits the shared atomic counter from the parent routeGroups to track the number of total groups.
77+
// It inherits the limits from the parent routeGroups.
78+
type fingerprintGroups struct {
79+
mu sync.RWMutex
80+
aggrGroups map[model.Fingerprint]*aggrGroup
81+
groupsNum *atomic.Int64
82+
limits Limits
83+
}
84+
85+
// LimitReached checks if the number of groups has reached the limit.
86+
func (fg *fingerprintGroups) LimitReached() bool {
87+
if limit := fg.limits.MaxNumberOfAggregationGroups(); limit > 0 && fg.groupsNum.Load() >= int64(limit) {
88+
return true
89+
}
90+
return false
91+
}
92+
93+
// AddGroup adds a new aggregation group to the map, initializing the inner maps if needed.
94+
// If the group already exists, it returns the existing aggregation group.
95+
func (fg *fingerprintGroups) AddGroup(fp model.Fingerprint, ag *aggrGroup) (group *aggrGroup, count int64, limit int) {
96+
fg.mu.Lock()
97+
defer fg.mu.Unlock()
98+
99+
if fg.aggrGroups == nil {
100+
fg.aggrGroups = make(map[model.Fingerprint]*aggrGroup)
101+
}
102+
// Check if we've reached the rate limit before creating a new group.
103+
if fg.LimitReached() {
104+
return nil, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups()
105+
}
106+
fg.aggrGroups[fp] = ag
107+
fg.groupsNum.Add(1)
108+
return ag, fg.groupsNum.Load(), fg.limits.MaxNumberOfAggregationGroups()
109+
}
110+
111+
// RemoveGroup removes an aggregation group from the map.
112+
func (fg *fingerprintGroups) RemoveGroup(fp model.Fingerprint) {
113+
fg.mu.Lock()
114+
defer fg.mu.Unlock()
115+
delete(fg.aggrGroups, fp)
116+
fg.groupsNum.Sub(1)
117+
}
118+
119+
// GetGroup returns an aggregation group by fingerprint.
120+
func (fg *fingerprintGroups) GetGroup(fp model.Fingerprint) *aggrGroup {
121+
fg.mu.RLock()
122+
defer fg.mu.RUnlock()
123+
return fg.aggrGroups[fp]
124+
}
125+
126+
// Range iterates over the fingerprintGroups.
127+
func (fg *fingerprintGroups) Range(fn func(model.Fingerprint, *aggrGroup) bool) {
128+
fg.mu.RLock()
129+
defer fg.mu.RUnlock()
130+
for fp, ag := range fg.aggrGroups {
131+
if !fn(fp, ag) {
132+
break
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)