Skip to content

Add some race-condition protection to VPA recommender #8320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion vertical-pod-autoscaler/docs/flags.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ This document is auto-generated from the flag definitions in the VPA recommender
| `storage` | string | | Specifies storage mode. Supported values: prometheus, checkpoint |
| `target-cpu-percentile` | float | 0.9 | CPU usage percentile that will be used as a base for CPU target recommendation. Doesn't affect CPU lower bound, CPU upper bound nor memory recommendations. |
| `target-memory-percentile` | float | 0.9 | Memory usage percentile that will be used as a base for memory target recommendation. Doesn't affect memory lower bound nor memory upper bound. |
| `update-worker-count` | | 10 | kube-api-qps Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (kube-api-qps and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. |
| `update-worker-count` | int | 10 | Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop. |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I happen to notice this weirdness in the --update-worker-count due to the back-ticks, so I fixed them while I was here.

Reference: https://pkg.go.dev/flag#PrintDefaults

The listed type, here int, can be changed by placing a back-quoted name in the flag's usage string; the first such item in the message is taken to be a parameter name to show in the message and the back quotes are stripped from the message when displayed

| `use-external-metrics` | | | ALPHA. Use an external metrics provider instead of metrics_server. |
| `username` | string | | The username used in the prometheus server basic auth |
| `v,` | | : 4 | , --v Level set the log level verbosity (default 4) |
Expand Down
2 changes: 1 addition & 1 deletion vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ var (
address = flag.String("address", ":8942", "The address to expose Prometheus metrics.")
storage = flag.String("storage", "", `Specifies storage mode. Supported values: prometheus, checkpoint (default)`)
memorySaver = flag.Bool("memory-saver", false, `If true, only track pods which have an associated VPA`)
updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits (`kube-api-qps` and `kube-api-burst`) are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.")
updateWorkerCount = flag.Int("update-worker-count", 10, "Number of concurrent workers to update VPA recommendations and checkpoints. When increasing this setting, make sure the client-side rate limits ('kube-api-qps' and 'kube-api-burst') are either increased or turned off as well. Determines the minimum number of VPA checkpoints written per recommender loop.")
)

// Prometheus history provider flags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ package model

import (
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -105,16 +106,27 @@ type AggregateContainerState struct {
// we want to know if it needs recommendation, if the recommendation
// is present and if the automatic updates are on (are we able to
// apply the recommendation to the pods).
LastRecommendation corev1.ResourceList
lastRecommendation corev1.ResourceList
IsUnderVPA bool
UpdateMode *vpa_types.UpdateMode
ScalingMode *vpa_types.ContainerScalingMode
ControlledResources *[]ResourceName

mutex sync.RWMutex
}

// GetLastRecommendation returns last recorded recommendation.
func (a *AggregateContainerState) GetLastRecommendation() corev1.ResourceList {
return a.LastRecommendation
a.mutex.RLock()
defer a.mutex.RUnlock()
return a.lastRecommendation
}

// SetLastRecommendation sets the last recorded recommendation in a thread-safe manner.
func (a *AggregateContainerState) SetLastRecommendation(recommendation corev1.ResourceList) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.lastRecommendation = recommendation
}

// NeedsRecommendation returns true if the state should have recommendation calculated.
Expand Down Expand Up @@ -147,7 +159,7 @@ func (a *AggregateContainerState) GetControlledResources() []ResourceName {
// a VPA object.
func (a *AggregateContainerState) MarkNotAutoscaled() {
a.IsUnderVPA = false
a.LastRecommendation = nil
a.lastRecommendation = nil
a.UpdateMode = nil
a.ScalingMode = nil
a.ControlledResources = nil
Expand Down
6 changes: 6 additions & 0 deletions vertical-pod-autoscaler/pkg/recommender/model/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package model
import (
"context"
"fmt"
"sync"
"time"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -82,6 +83,9 @@ type clusterState struct {

lastAggregateContainerStateGC time.Time
gcInterval time.Duration

// Mutex to protect concurrent access to maps
mutex sync.RWMutex
}

// StateMapSize is the number of pods being tracked by the VPA
Expand Down Expand Up @@ -464,6 +468,8 @@ func (cluster *clusterState) getContributiveAggregateStateKeys(ctx context.Conte
// keep track of empty recommendations and log information about them
// periodically.
func (cluster *clusterState) RecordRecommendation(vpa *Vpa, now time.Time) error {
cluster.mutex.Lock()
defer cluster.mutex.Unlock()
if vpa.Recommendation != nil && len(vpa.Recommendation.ContainerRecommendations) > 0 {
delete(cluster.emptyVPAs, vpa.ID)
return nil
Expand Down
4 changes: 2 additions & 2 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func (vpa *Vpa) UpdateRecommendation(recommendation *vpa_types.RecommendedPodRes
for _, containerRecommendation := range recommendation.ContainerRecommendations {
for container, state := range vpa.aggregateContainerStates {
if container.ContainerName() == containerRecommendation.ContainerName {
metrics_quality.ObserveRecommendationChange(state.LastRecommendation, containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount)
state.LastRecommendation = containerRecommendation.UncappedTarget
metrics_quality.ObserveRecommendationChange(state.GetLastRecommendation(), containerRecommendation.UncappedTarget, vpa.UpdateMode, vpa.PodCount)
state.SetLastRecommendation(containerRecommendation.UncappedTarget)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions vertical-pod-autoscaler/pkg/recommender/model/vpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func TestUpdateRecommendation(t *testing.T) {
for container, rec := range tc.containers {
state := &AggregateContainerState{}
if rec != nil {
state.LastRecommendation = corev1.ResourceList{
state.lastRecommendation = corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(rec.cpu),
corev1.ResourceMemory: resource.MustParse(rec.mem),
}
Expand All @@ -209,9 +209,9 @@ func TestUpdateRecommendation(t *testing.T) {
for key, state := range vpa.aggregateContainerStates {
expected, ok := tc.expectedLast[key.ContainerName()]
if !ok {
assert.Nil(t, state.LastRecommendation)
assert.Nil(t, state.lastRecommendation)
} else {
assert.Equal(t, expected, state.LastRecommendation)
assert.Equal(t, expected, state.lastRecommendation)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -119,7 +120,8 @@ type objectCounterKey struct {

// ObjectCounter helps split all VPA objects into buckets
type ObjectCounter struct {
cnt map[objectCounterKey]int
cnt map[objectCounterKey]int
mutex sync.Mutex
}

// Register initializes all metrics for VPA Recommender
Expand Down Expand Up @@ -189,7 +191,9 @@ func (oc *ObjectCounter) Add(vpa *model.Vpa) {
matchesPods: vpa.HasMatchedPods(),
unsupportedConfig: vpa.Conditions.ConditionActive(vpa_types.ConfigUnsupported),
}
oc.mutex.Lock()
oc.cnt[key]++
oc.mutex.Unlock()
Comment on lines +194 to +196
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at changing this to use atomic.AddInt64().

It works, but my concern is that the map need to gaurentee that all keys exist in the map. This is possible by pre-filling the map with all possible combinations. However, we need to remember to update this file each time a new type is added. Ie:

modes = []string{
string(vpa_types.UpdateModeOff),
string(vpa_types.UpdateModeInitial),
string(vpa_types.UpdateModeRecreate),
string(vpa_types.UpdateModeAuto),
}

The problem is that there doesn't seem to be a good way to dynamically generate that list in the VPA.
What we may be able to do is write a generate script that will keep this file up to date, which we can run in CI.

}

// Observe passes all the computed bucket values to metrics
Expand Down
Loading