diff --git a/vertical-pod-autoscaler/docs/flags.md b/vertical-pod-autoscaler/docs/flags.md index 9eb45454108d..efea52ccb8a8 100644 --- a/vertical-pod-autoscaler/docs/flags.md +++ b/vertical-pod-autoscaler/docs/flags.md @@ -125,7 +125,7 @@ This document is auto-generated from the flag definitions in the VPA recommender | `storage` | string | | Specifies storage mode. Supported values: prometheus, checkpoint | | `target-cpu-percentile` | float | 0.9 | CPU usage percentile that will be used as a base for CPU target recommendation. Doesn't affect CPU lower bound, CPU upper bound nor memory recommendations. | | `target-memory-percentile` | float | 0.9 | Memory usage percentile that will be used as a base for memory target recommendation. Doesn't affect memory lower bound nor memory upper bound. | -| `update-worker-count` | | 10 | kube-api-qps Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (kube-api-qps and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. | +| `update-worker-count` | int | 10 | Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. | | `use-external-metrics` | | | ALPHA. Use an external metrics provider instead of metrics_server. | | `username` | string | | The username used in the prometheus server basic auth | | `v,` | | : 4 | , --v Level set the log level verbosity (default 4) | diff --git a/vertical-pod-autoscaler/pkg/recommender/main.go b/vertical-pod-autoscaler/pkg/recommender/main.go index ed7eb363efe5..48bfc814dccf 100644 --- a/vertical-pod-autoscaler/pkg/recommender/main.go +++ b/vertical-pod-autoscaler/pkg/recommender/main.go @@ -66,7 +66,7 @@ var ( address = flag.String("address", ":8942", "The address to expose Prometheus metrics.") storage = flag.String("storage", "", `Specifies storage mode. Supported values: prometheus, checkpoint (default)`) memorySaver = flag.Bool("memory-saver", false, `If true, only track pods which have an associated VPA`) - updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (`kube-api-qps` and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.") + updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.") ) // Prometheus history provider flags diff --git a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go index c18e860f4ff7..9d3a8411e991 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/aggregate_container_state.go @@ -37,6 +37,7 @@ package model import ( "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -105,16 +106,27 @@ type AggregateContainerState struct { // we want to know if it needs recommendation, if the recommendation // is present and if the automatic updates are on (are we able to // apply the recommendation to the pods). - LastRecommendation corev1.ResourceList + lastRecommendation corev1.ResourceList IsUnderVPA bool UpdateMode *vpa_types.UpdateMode ScalingMode *vpa_types.ContainerScalingMode ControlledResources *[]ResourceName + + mutex sync.RWMutex } // GetLastRecommendation returns last recorded recommendation. func (a *AggregateContainerState) GetLastRecommendation() corev1.ResourceList { - return a.LastRecommendation + a.mutex.RLock() + defer a.mutex.RUnlock() + return a.lastRecommendation +} + +// SetLastRecommendation sets the last recorded recommendation in a thread-safe manner. +func (a *AggregateContainerState) SetLastRecommendation(recommendation corev1.ResourceList) { + a.mutex.Lock() + defer a.mutex.Unlock() + a.lastRecommendation = recommendation } // NeedsRecommendation returns true if the state should have recommendation calculated. @@ -147,7 +159,7 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName { // a VPA object. func (a *AggregateContainerState) MarkNotAutoscaled() { a.IsUnderVPA = false - a.LastRecommendation = nil + a.lastRecommendation = nil a.UpdateMode = nil a.ScalingMode = nil a.ControlledResources = nil diff --git a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go index a16f22f8825e..780a9e0a0af0 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/cluster.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/cluster.go @@ -19,6 +19,7 @@ package model import ( "context" "fmt" + "sync" "time" apiv1 "k8s.io/api/core/v1" @@ -82,6 +83,9 @@ type clusterState struct { lastAggregateContainerStateGC time.Time gcInterval time.Duration + + // Mutex to protect concurrent access to maps + mutex sync.RWMutex } // StateMapSize is the number of pods being tracked by the VPA @@ -464,6 +468,8 @@ func (cluster *clusterState) getContributiveAggregateStateKeys(ctx context.Conte // keep track of empty recommendations and log information about them // periodically. func (cluster *clusterState) RecordRecommendation(vpa *Vpa, now time.Time) error { + cluster.mutex.Lock() + defer cluster.mutex.Unlock() if vpa.Recommendation != nil && len(vpa.Recommendation.ContainerRecommendations) > 0 { delete(cluster.emptyVPAs, vpa.ID) return nil diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go index 0c505ae10f31..f346a62e37e7 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa.go @@ -166,8 +166,8 @@ func (vpa *Vpa) UpdateRecommendation(recommendation *vpa_types.RecommendedPodRes for _, containerRecommendation := range recommendation.ContainerRecommendations { for container, state := range vpa.aggregateContainerStates { if container.ContainerName() == containerRecommendation.ContainerName { - metrics_quality.ObserveRecommendationChange(state.LastRecommendation, containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount) - state.LastRecommendation = containerRecommendation.UncappedTarget + metrics_quality.ObserveRecommendationChange(state.GetLastRecommendation(), containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount) + state.SetLastRecommendation(containerRecommendation.UncappedTarget) } } } diff --git a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go index 1812a824507f..4f19380a4845 100644 --- a/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go @@ -193,7 +193,7 @@ func TestUpdateRecommendation(t *testing.T) { for container, rec := range tc.containers { state := &AggregateContainerState{} if rec != nil { - state.LastRecommendation = corev1.ResourceList{ + state.lastRecommendation = corev1.ResourceList{ corev1.ResourceCPU: resource.MustParse(rec.cpu), corev1.ResourceMemory: resource.MustParse(rec.mem), } @@ -209,9 +209,9 @@ func TestUpdateRecommendation(t *testing.T) { for key, state := range vpa.aggregateContainerStates { expected, ok := tc.expectedLast[key.ContainerName()] if !ok { - assert.Nil(t, state.LastRecommendation) + assert.Nil(t, state.lastRecommendation) } else { - assert.Equal(t, expected, state.LastRecommendation) + assert.Equal(t, expected, state.lastRecommendation) } } }) diff --git a/vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go b/vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go index 700c26bcfea1..acd45463e3b6 100644 --- a/vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go +++ b/vertical-pod-autoscaler/pkg/utils/metrics/recommender/recommender.go @@ -21,6 +21,7 @@ import ( "fmt" "net/http" "strconv" + "sync" "time" "github.com/prometheus/client_golang/prometheus" @@ -119,7 +120,8 @@ type objectCounterKey struct { // ObjectCounter helps split all VPA objects into buckets type ObjectCounter struct { - cnt map[objectCounterKey]int + cnt map[objectCounterKey]int + mutex sync.Mutex } // Register initializes all metrics for VPA Recommender @@ -189,7 +191,9 @@ func (oc *ObjectCounter) Add(vpa *model.Vpa) { matchesPods: vpa.HasMatchedPods(), unsupportedConfig: vpa.Conditions.ConditionActive(vpa_types.ConfigUnsupported), } + oc.mutex.Lock() oc.cnt[key]++ + oc.mutex.Unlock() } // Observe passes all the computed bucket values to metrics