@@ -23,6 +23,10 @@ import (
23
23
"github.com/cortexproject/cortex/pkg/util/services"
24
24
)
25
25
26
+ const (
27
+ userReplicaGroupUpdateInterval = 30 * time .Second
28
+ )
29
+
26
30
var (
27
31
errNegativeUpdateTimeoutJitterMax = errors .New ("HA tracker max update timeout jitter shouldn't be negative" )
28
32
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
@@ -137,6 +141,7 @@ type HATracker struct {
137
141
electedReplicaTimestamp * prometheus.GaugeVec
138
142
electedReplicaPropagationTime prometheus.Histogram
139
143
kvCASCalls * prometheus.CounterVec
144
+ userReplicaGroupCount * prometheus.GaugeVec
140
145
141
146
cleanupRuns prometheus.Counter
142
147
replicasMarkedForDeletion prometheus.Counter
@@ -182,6 +187,11 @@ func NewHATracker(cfg HATrackerConfig, limits HATrackerLimits, trackerStatusConf
182
187
Help : "The total number of CAS calls to the KV store for a user ID/cluster." ,
183
188
}, []string {"user" , "cluster" }),
184
189
190
+ userReplicaGroupCount : promauto .With (reg ).NewGaugeVec (prometheus.GaugeOpts {
191
+ Name : "ha_tracker_user_replica_group_count" ,
192
+ Help : "Number of HA replica groups tracked for each user." ,
193
+ }, []string {"user" }),
194
+
185
195
cleanupRuns : promauto .With (reg ).NewCounter (prometheus.CounterOpts {
186
196
Name : "ha_tracker_replicas_cleanup_started_total" ,
187
197
Help : "Number of elected replicas cleanup loops started." ,
@@ -227,11 +237,26 @@ func (c *HATracker) loop(ctx context.Context) error {
227
237
228
238
// Start cleanup loop. It will stop when context is done.
229
239
wg := sync.WaitGroup {}
230
- wg .Add (1 )
240
+ wg .Add (2 )
231
241
go func () {
232
242
defer wg .Done ()
233
243
c .cleanupOldReplicasLoop (ctx )
234
244
}()
245
+ // Start periodic update of user replica group count.
246
+ go func () {
247
+ defer wg .Done ()
248
+ ticker := time .NewTicker (userReplicaGroupUpdateInterval )
249
+ defer ticker .Stop ()
250
+
251
+ for {
252
+ select {
253
+ case <- ticker .C :
254
+ c .updateUserReplicaGroupCount ()
255
+ case <- ctx .Done ():
256
+ return
257
+ }
258
+ }
259
+ }()
235
260
236
261
// The KVStore config we gave when creating c should have contained a prefix,
237
262
// which would have given us a prefixed KVStore client. So, we can pass empty string here.
@@ -504,6 +529,9 @@ func (c *HATracker) CleanupHATrackerMetricsForUser(userID string) {
504
529
if err := util .DeleteMatchingLabels (c .kvCASCalls , filter ); err != nil {
505
530
level .Warn (c .logger ).Log ("msg" , "failed to remove cortex_ha_tracker_kv_store_cas_total metric for user" , "user" , userID , "err" , err )
506
531
}
532
+ if err := util .DeleteMatchingLabels (c .userReplicaGroupCount , filter ); err != nil {
533
+ level .Warn (c .logger ).Log ("msg" , "failed to remove cortex_ha_tracker_user_replica_group_count metric for user" , "user" , userID , "err" , err )
534
+ }
507
535
}
508
536
509
537
// Returns a snapshot of the currently elected replicas. Useful for status display
@@ -521,3 +549,12 @@ func (c *HATracker) SnapshotElectedReplicas() map[string]ReplicaDesc {
521
549
}
522
550
return electedCopy
523
551
}
552
+
553
+ func (t * HATracker ) updateUserReplicaGroupCount () {
554
+ t .electedLock .RLock ()
555
+ defer t .electedLock .RUnlock ()
556
+
557
+ for user , groups := range t .replicaGroups {
558
+ t .userReplicaGroupCount .WithLabelValues (user ).Set (float64 (len (groups )))
559
+ }
560
+ }
0 commit comments