Skip to content

Commit 362e902

Browse files
authored
merge Prometheus metrics for removed user to avoid having to recompute metrics from scratch everytime (#4813)
* merge Prometheus metrics for removed user to avoid having to recompute metrics from scratch everytime Signed-off-by: Roy Chiang <[email protected]> * run linter Signed-off-by: Roy Chiang <[email protected]>
1 parent 234bdde commit 362e902

File tree

2 files changed

+293
-5
lines changed

2 files changed

+293
-5
lines changed

pkg/util/metrics_helper.go

Lines changed: 190 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77
"math"
8+
"strings"
89
"sync"
910

11+
"github.com/gogo/protobuf/proto"
12+
1013
"github.com/go-kit/log/level"
1114
"github.com/prometheus/client_golang/prometheus"
1215
dto "github.com/prometheus/client_model/go"
@@ -562,13 +565,14 @@ type UserRegistry struct {
562565
// UserRegistries holds Prometheus registries for multiple users, guaranteeing
563566
// multi-thread safety and stable ordering.
564567
type UserRegistries struct {
565-
regsMu sync.Mutex
566-
regs []UserRegistry
568+
regsMu sync.Mutex
569+
regs []UserRegistry
570+
removedMetrics MetricFamilyMap
567571
}
568572

569573
// NewUserRegistries makes new UserRegistries.
570574
func NewUserRegistries() *UserRegistries {
571-
return &UserRegistries{}
575+
return &UserRegistries{removedMetrics: make(MetricFamilyMap)}
572576
}
573577

574578
// AddUserRegistry adds an user registry. If user already has a registry,
@@ -647,15 +651,21 @@ func (r *UserRegistries) softRemoveUserRegistry(ur *UserRegistry) bool {
647651
return false
648652
}
649653

650-
ur.lastGather, err = NewMetricFamilyMap(last)
654+
gatheredMetrics, err := NewMetricFamilyMap(last)
651655
if err != nil {
652656
level.Warn(util_log.Logger).Log("msg", "failed to gather metrics from registry", "user", ur.user, "err", err)
653657
return false
654658
}
655659

660+
aggregatedMetrics, err := MergeMetricFamilies([]MetricFamilyMap{gatheredMetrics, r.removedMetrics})
661+
if err != nil {
662+
level.Warn(util_log.Logger).Log("msg", "failed to merge metrics", "user", ur.user, "err", err)
663+
return false
664+
}
665+
r.removedMetrics = aggregatedMetrics
656666
ur.user = ""
657667
ur.reg = nil
658-
return true
668+
return false
659669
}
660670

661671
// Registries returns a copy of the user registries list.
@@ -704,6 +714,11 @@ func (r *UserRegistries) BuildMetricFamiliesPerUser() MetricFamiliesPerUser {
704714
continue
705715
}
706716
}
717+
data = append(data, struct {
718+
user string
719+
metrics MetricFamilyMap
720+
}{
721+
user: "", metrics: r.removedMetrics})
707722
return data
708723
}
709724

@@ -805,3 +820,173 @@ type CollectorVec interface {
805820
prometheus.Collector
806821
Delete(labels prometheus.Labels) bool
807822
}
823+
824+
type MergedMetricFamily struct {
825+
metricFamily *dto.MetricFamily
826+
metricMap MetricMap
827+
}
828+
829+
func (m *MergedMetricFamily) CreateMetricFamily() *dto.MetricFamily {
830+
newMetricFamily := proto.Clone(m.metricFamily).(*dto.MetricFamily)
831+
var metrics []*dto.Metric
832+
833+
for _, metric := range m.metricMap.metrics {
834+
for _, m := range metric {
835+
metrics = append(metrics, &m.metric)
836+
}
837+
}
838+
839+
newMetricFamily.Metric = metrics
840+
return newMetricFamily
841+
}
842+
843+
// MergeMetricFamilies - Capable of merging summaries, histograms, and counters
844+
func MergeMetricFamilies(metricFamilies []MetricFamilyMap) (MetricFamilyMap, error) {
845+
mergedMap := make(map[string]*MergedMetricFamily)
846+
847+
for _, mf := range metricFamilies {
848+
for metricName, metricFamily := range mf {
849+
mergeFunc, err := getMergeFunc(metricFamily.GetType())
850+
if err != nil {
851+
return nil, err
852+
}
853+
854+
if _, found := mergedMap[metricName]; !found {
855+
mergedMap[metricName] = &MergedMetricFamily{metricFamily: proto.Clone(metricFamily).(*dto.MetricFamily), metricMap: NewMetricMap()}
856+
}
857+
858+
for _, metric := range metricFamily.Metric {
859+
(mergedMap[metricName].metricMap).AddOrSetMetric(*metric, mergeFunc)
860+
}
861+
}
862+
}
863+
864+
metricFamilyMap := make(MetricFamilyMap)
865+
for metricName, mergedMetricFamily := range mergedMap {
866+
metricFamilyMap[metricName] = mergedMetricFamily.CreateMetricFamily()
867+
}
868+
869+
return metricFamilyMap, nil
870+
}
871+
872+
func getMergeFunc(metricType dto.MetricType) (func(existing *dto.Metric, new *dto.Metric), error) {
873+
switch metricType {
874+
case dto.MetricType_SUMMARY:
875+
return mergeSummary, nil
876+
case dto.MetricType_COUNTER:
877+
return mergeCounter, nil
878+
case dto.MetricType_HISTOGRAM:
879+
return mergeHistogram, nil
880+
default:
881+
return nil, fmt.Errorf("unknown metric type: %v", metricType)
882+
}
883+
}
884+
885+
func mergeCounter(mf1, mf2 *dto.Metric) {
886+
newValue := *mf1.Counter.Value + *mf2.Counter.Value
887+
mf1.Counter.Value = &newValue
888+
}
889+
890+
func mergeHistogram(mf1, mf2 *dto.Metric) {
891+
bucketMap := map[float64]uint64{}
892+
893+
for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) {
894+
bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount()
895+
}
896+
897+
var newBucket []*dto.Bucket
898+
for upperBound, cumulativeCount := range bucketMap {
899+
ubValue := upperBound
900+
ccValue := cumulativeCount
901+
newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue})
902+
}
903+
904+
newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount
905+
newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum
906+
mf1.Histogram.Bucket = newBucket
907+
mf1.Histogram.SampleCount = &newSampleCount
908+
mf1.Histogram.SampleSum = &newSampleSum
909+
}
910+
911+
func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) {
912+
newSampleCount := *mf1.Summary.SampleCount + *mf2.Summary.SampleCount
913+
newSampleSum := *mf1.Summary.SampleSum + *mf2.Summary.SampleSum
914+
915+
// we are not merging the Quantiles themselves because there's no operation that makes sense
916+
mf1.Summary.Quantile = []*dto.Quantile{}
917+
mf1.Summary.SampleCount = &newSampleCount
918+
mf1.Summary.SampleSum = &newSampleSum
919+
}
920+
921+
type MetricMap struct {
922+
metrics map[string][]*Metric
923+
lock sync.Mutex
924+
}
925+
926+
type Metric struct {
927+
metric dto.Metric
928+
lock sync.Mutex
929+
}
930+
931+
func NewMetricMap() MetricMap {
932+
return MetricMap{
933+
metrics: make(map[string][]*Metric),
934+
}
935+
}
936+
937+
// AddOrSetMetric - given a metric, see if there's another metric with the same labels. If not, add metric to list
938+
// If yes, call mergeFn to merge the two metrics in-place, and updating existing metric
939+
func (m *MetricMap) AddOrSetMetric(metric dto.Metric, mergeFn func(existing *dto.Metric, new *dto.Metric)) {
940+
var metricLabels []string
941+
for _, labelPair := range metric.GetLabel() {
942+
metricLabels = append(metricLabels, fmt.Sprintf("%s=%s", labelPair.GetName(), labelPair.GetValue()))
943+
}
944+
945+
metricKey := strings.Join(metricLabels, ",")
946+
947+
m.lock.Lock()
948+
defer m.lock.Unlock()
949+
950+
if metrics, found := m.metrics[metricKey]; found {
951+
// we might have hash collision, so let's iterate through the list to make sure the item is actually what we want
952+
for _, existingMetric := range metrics {
953+
same := m.compareLabels(existingMetric.metric.GetLabel(), metric.GetLabel())
954+
if same {
955+
existingMetric.lock.Lock()
956+
mergeFn(&existingMetric.metric, &metric)
957+
existingMetric.lock.Unlock()
958+
return
959+
}
960+
}
961+
// only get there if we don't have the same metric, so let's append it
962+
m.metrics[metricKey] = append(m.metrics[metricKey], &Metric{metric: metric})
963+
return
964+
}
965+
966+
// no such key, so let's add it
967+
m.metrics[metricKey] = []*Metric{{metric: metric}}
968+
}
969+
970+
func (m *MetricMap) compareLabels(labels1, labels2 []*dto.LabelPair) bool {
971+
if len(labels1) != len(labels2) {
972+
return false
973+
}
974+
975+
// create a map of labels for lookup
976+
labelMap := make(map[string]string)
977+
for _, labelPair := range labels1 {
978+
labelMap[labelPair.GetName()] = labelPair.GetValue()
979+
}
980+
981+
for _, labelPair := range labels2 {
982+
if value, found := labelMap[labelPair.GetName()]; found {
983+
if value != labelPair.GetValue() {
984+
return false
985+
}
986+
} else {
987+
return false
988+
}
989+
}
990+
991+
return true
992+
}

pkg/util/metrics_helper_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,85 @@ func TestUserRegistries_RemoveUserRegistry_SoftRemoval(t *testing.T) {
724724
summary_user_sum{user="5"} 25
725725
summary_user_count{user="5"} 5
726726
`)))
727+
728+
tm.regs.RemoveUserRegistry(strconv.Itoa(4), false)
729+
require.NoError(t, testutil.GatherAndCompare(mainRegistry, bytes.NewBufferString(`
730+
# HELP counter help
731+
# TYPE counter counter
732+
# No change in counter
733+
counter 75
734+
735+
# HELP counter_labels help
736+
# TYPE counter_labels counter
737+
# No change in counter per label.
738+
counter_labels{label_one="a"} 75
739+
740+
# HELP counter_user help
741+
# TYPE counter_user counter
742+
# User 3 is now missing.
743+
counter_user{user="1"} 5
744+
counter_user{user="2"} 10
745+
counter_user{user="5"} 25
746+
747+
# HELP gauge help
748+
# TYPE gauge gauge
749+
# Drop in the gauge (value 3, counted 5 times)
750+
gauge 40
751+
752+
# HELP gauge_labels help
753+
# TYPE gauge_labels gauge
754+
# Drop in the gauge (value 3, counted 5 times)
755+
gauge_labels{label_one="a"} 40
756+
757+
# HELP gauge_user help
758+
# TYPE gauge_user gauge
759+
# User 3 is now missing.
760+
gauge_user{user="1"} 5
761+
gauge_user{user="2"} 10
762+
gauge_user{user="5"} 25
763+
764+
# HELP histogram help
765+
# TYPE histogram histogram
766+
# No change in the histogram
767+
histogram_bucket{le="1"} 5
768+
histogram_bucket{le="3"} 15
769+
histogram_bucket{le="5"} 25
770+
histogram_bucket{le="+Inf"} 25
771+
histogram_sum 75
772+
histogram_count 25
773+
774+
# HELP histogram_labels help
775+
# TYPE histogram_labels histogram
776+
# No change in the histogram per label
777+
histogram_labels_bucket{label_one="a",le="1"} 5
778+
histogram_labels_bucket{label_one="a",le="3"} 15
779+
histogram_labels_bucket{label_one="a",le="5"} 25
780+
histogram_labels_bucket{label_one="a",le="+Inf"} 25
781+
histogram_labels_sum{label_one="a"} 75
782+
histogram_labels_count{label_one="a"} 25
783+
784+
# HELP summary help
785+
# TYPE summary summary
786+
# No change in the summary
787+
summary_sum 75
788+
summary_count 25
789+
790+
# HELP summary_labels help
791+
# TYPE summary_labels summary
792+
# No change in the summary per label
793+
summary_labels_sum{label_one="a"} 75
794+
summary_labels_count{label_one="a"} 25
795+
796+
# HELP summary_user help
797+
# TYPE summary_user summary
798+
# Summary for user 3 is now missing.
799+
summary_user_sum{user="1"} 5
800+
summary_user_count{user="1"} 5
801+
summary_user_sum{user="2"} 10
802+
summary_user_count{user="2"} 5
803+
summary_user_sum{user="5"} 25
804+
summary_user_count{user="5"} 5
805+
`)))
727806
}
728807
func TestUserRegistries_RemoveUserRegistry_HardRemoval(t *testing.T) {
729808
tm := setupTestMetrics()
@@ -1144,6 +1223,30 @@ func TestGetLabels(t *testing.T) {
11441223
})
11451224
}
11461225

1226+
func TestMergeMetricFamilies(t *testing.T) {
1227+
tm := setupTestMetrics()
1228+
1229+
var list []MetricFamilyMap
1230+
for _, registry := range tm.regs.regs {
1231+
mfs, err := registry.reg.Gather()
1232+
var filteredMf []*dto.MetricFamily
1233+
for _, metric := range mfs {
1234+
if metric.GetType() == dto.MetricType_GAUGE {
1235+
continue
1236+
}
1237+
filteredMf = append(filteredMf, metric)
1238+
}
1239+
require.NoError(t, err)
1240+
mfm, err := NewMetricFamilyMap(filteredMf)
1241+
require.NoError(t, err)
1242+
list = append(list, mfm)
1243+
}
1244+
1245+
_, err := MergeMetricFamilies(list)
1246+
1247+
require.NoError(t, err)
1248+
}
1249+
11471250
func verifyLabels(t *testing.T, m prometheus.Collector, filter map[string]string, expectedLabels []labels.Labels) {
11481251
result, err := GetLabels(m, filter)
11491252
require.NoError(t, err)

0 commit comments

Comments
 (0)