Skip to content

Switch VPA checkpoint to use a lister #8331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type ClusterStateFeederFactory struct {
KubeClient kube_client.Interface
MetricsClient metrics.MetricsClient
VpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
VpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
VpaLister vpa_lister.VerticalPodAutoscalerLister
PodLister v1lister.PodLister
OOMObserver oom.Observer
Expand All @@ -99,6 +100,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder {
metricsClient: m.MetricsClient,
oomChan: m.OOMObserver.GetObservedOomsChannel(),
vpaCheckpointClient: m.VpaCheckpointClient,
vpaCheckpointLister: m.VpaCheckpointLister,
vpaLister: m.VpaLister,
clusterState: m.ClusterState,
specClient: spec.NewSpecClient(m.PodLister),
Expand Down Expand Up @@ -206,6 +208,7 @@ type clusterStateFeeder struct {
metricsClient metrics.MetricsClient
oomChan <-chan oom.OomInfo
vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter
vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister
vpaLister vpa_lister.VerticalPodAutoscalerLister
clusterState model.ClusterState
selectorFetcher target.VpaTargetSelectorFetcher
Expand Down Expand Up @@ -267,25 +270,29 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) {
klog.V(3).InfoS("Initializing VPA from checkpoints")
feeder.LoadVPAs(ctx)

klog.V(3).InfoS("Fetching VPA checkpoints")
checkpointList, err := feeder.vpaCheckpointLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Cannot list VPA checkpoints")
}

namespaces := make(map[string]bool)
for _, v := range feeder.clusterState.VPAs() {
namespaces[v.ID.Namespace] = true
}

for namespace := range namespaces {
klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace)
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace)
if feeder.shouldIgnoreNamespace(namespace) {
klog.V(3).InfoS("Skipping loading VPA Checkpoints from namespace.", "namespace", namespace, "vpaObjectNamespace", feeder.vpaObjectNamespace, "ignoredNamespaces", feeder.ignoredNamespaces)
continue
}
for _, checkpoint := range checkpointList.Items {

for _, checkpoint := range checkpointList {
klog.V(3).InfoS("Loading checkpoint for VPA", "checkpoint", klog.KRef(checkpoint.Namespace, checkpoint.Spec.VPAObjectName), "container", checkpoint.Spec.ContainerName)
err = feeder.setVpaCheckpoint(&checkpoint)
err = feeder.setVpaCheckpoint(checkpoint)
if err != nil {
klog.ErrorS(err, "Error while loading checkpoint")
}

}
}
}
Expand Down Expand Up @@ -345,11 +352,12 @@ func (feeder *clusterStateFeeder) shouldIgnoreNamespace(namespace string) bool {

func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(ctx context.Context, namespace string, allVPAKeys map[model.VpaID]bool) error {
var err error
checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{})
checkpointList, err := feeder.vpaCheckpointLister.VerticalPodAutoscalerCheckpoints(namespace).List(labels.Everything())

if err != nil {
return err
}
for _, checkpoint := range checkpointList.Items {
for _, checkpoint := range checkpointList {
vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName}
if !allVPAKeys[vpaID] {
if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(ctx, checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -859,11 +859,12 @@ func TestFilterVPAsIgnoreNamespaces(t *testing.T) {
func TestCanCleanupCheckpoints(t *testing.T) {
_, tctx := ktesting.NewTestContext(t)
client := fake.NewSimpleClientset()
namespace := "testNamespace"

_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{})
_, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
assert.NoError(t, err)

vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace("testNamespace").WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace(namespace).WithTargetRef(&autoscalingv1.CrossVersionObjectReference{
Kind: kind,
Name: name1,
APIVersion: apiVersion,
Expand All @@ -878,22 +879,19 @@ func TestCanCleanupCheckpoints(t *testing.T) {
vpaLister := &test.VerticalPodAutoscalerListerMock{}
vpaLister.On("List").Return(vpas, nil)

checkpoints := &vpa_types.VerticalPodAutoscalerCheckpointList{
Items: []vpa_types.VerticalPodAutoscalerCheckpoint{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "testNamespace",
Name: "nonExistentVPA",
},
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
VPAObjectName: "nonExistentVPA",
},
},
vpaCheckPoint := &vpa_types.VerticalPodAutoscalerCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: "nonExistentVPA",
},
Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{
VPAObjectName: "nonExistentVPA",
},
}
vpacheckpoints := []*vpa_types.VerticalPodAutoscalerCheckpoint{vpaCheckPoint}

for _, vpa := range vpas {
checkpoints.Items = append(checkpoints.Items, vpa_types.VerticalPodAutoscalerCheckpoint{
vpacheckpoints = append(vpacheckpoints, &vpa_types.VerticalPodAutoscalerCheckpoint{
ObjectMeta: metav1.ObjectMeta{
Namespace: vpa.Namespace,
Name: vpa.Name,
Expand All @@ -904,23 +902,29 @@ func TestCanCleanupCheckpoints(t *testing.T) {
})
}

// Create a mock checkpoint client to track deletions
checkpointClient := &fakeautoscalingv1.FakeAutoscalingV1{Fake: &core.Fake{}}
checkpointClient.AddReactor("list", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
return true, checkpoints, nil
})

// Track deleted checkpoints
deletedCheckpoints := []string{}
checkpointClient.AddReactor("delete", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) {
deleteAction := action.(core.DeleteAction)
deletedCheckpoints = append(deletedCheckpoints, deleteAction.GetName())

return true, nil, nil
})

// Create namespace lister mock that will return the checkpoint list
checkpointNamespaceLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
checkpointNamespaceLister.On("List").Return(vpacheckpoints, nil)

// Create main checkpoint mock that will return the namespace lister
checkpointLister := &test.VerticalPodAutoscalerCheckPointListerMock{}
checkpointLister.On("VerticalPodAutoscalerCheckpoints", namespace).Return(checkpointNamespaceLister)

feeder := clusterStateFeeder{
coreClient: client.CoreV1(),
vpaLister: vpaLister,
vpaCheckpointClient: checkpointClient,
vpaCheckpointLister: checkpointLister,
clusterState: model.NewClusterState(testGcPeriod),
recommenderName: "default",
}
Expand Down
1 change: 1 addition & 0 deletions vertical-pod-autoscaler/pkg/recommender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm
MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"),
VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(),
VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace),
ClusterState: clusterState,
SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory),
MemorySaveMode: *memorySaver,
Expand Down
30 changes: 30 additions & 0 deletions vertical-pod-autoscaler/pkg/utils/test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,36 @@ func (m *VerticalPodAutoscalerListerMock) Get(name string) (*vpa_types.VerticalP
return nil, fmt.Errorf("unimplemented")
}

// VerticalPodAutoscalerCheckPointListerMock is a mock of VerticalPodAutoscalerCheckPointLister
type VerticalPodAutoscalerCheckPointListerMock struct {
mock.Mock
}

// List is a mock implementation of VerticalPodAutoscalerLister.List
func (m *VerticalPodAutoscalerCheckPointListerMock) List(selector labels.Selector) (ret []*vpa_types.VerticalPodAutoscalerCheckpoint, err error) {
args := m.Called()
var returnArg []*vpa_types.VerticalPodAutoscalerCheckpoint
if args.Get(0) != nil {
returnArg = args.Get(0).([]*vpa_types.VerticalPodAutoscalerCheckpoint)
}
return returnArg, args.Error(1)
}

// VerticalPodAutoscalerCheckpoints is a mock implementation of returning a lister for namespace.
func (m *VerticalPodAutoscalerCheckPointListerMock) VerticalPodAutoscalerCheckpoints(namespace string) vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister {
args := m.Called(namespace)
var returnArg vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister
if args.Get(0) != nil {
returnArg = args.Get(0).(vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister)
}
return returnArg
}

// Get is not implemented for this mock
func (m *VerticalPodAutoscalerCheckPointListerMock) Get(name string) (*vpa_types.VerticalPodAutoscalerCheckpoint, error) {
return nil, fmt.Errorf("unimplemented")
}

// VerticalPodAutoscalerV1Beta1ListerMock is a mock of VerticalPodAutoscalerLister or
// VerticalPodAutoscalerNamespaceLister - the crucial List method is the same.
type VerticalPodAutoscalerV1Beta1ListerMock struct {
Expand Down
30 changes: 30 additions & 0 deletions vertical-pod-autoscaler/pkg/utils/vpa/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,36 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct
return vpaLister
}

// NewVpaCheckpointLister returns VerticalPodAutoscalerCheckpointLister configured to fetch all VPACheckpoint objects from namespace,
// set namespace to k8sapiv1.NamespaceAll to select all namespaces.
// The method blocks until vpaCheckpointLister is initially populated.
func NewVpaCheckpointLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerCheckpointLister {
vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalercheckpoints", namespace, fields.Everything())
informerOptions := cache.InformerOptions{
ObjectType: &vpa_types.VerticalPodAutoscalerCheckpoint{},
ListerWatcher: vpaListWatch,
Handler: &cache.ResourceEventHandlerFuncs{},
ResyncPeriod: 1 * time.Hour,
Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
}

store, controller := cache.NewInformerWithOptions(informerOptions)
indexer, ok := store.(cache.Indexer)
if !ok {
klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
vpaCheckpointLister := vpa_lister.NewVerticalPodAutoscalerCheckpointLister(indexer)
go controller.Run(stopChannel)
if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) {
klog.ErrorS(nil, "Failed to sync VPA checkpoint cache during initialization")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
} else {
klog.InfoS("Initial VPA checkpoint synced successfully")
}
return vpaCheckpointLister
}

// PodMatchesVPA returns true iff the vpaWithSelector matches the Pod.
func PodMatchesVPA(pod *core.Pod, vpaWithSelector *VpaWithSelector) bool {
return PodLabelsMatchVPA(pod.Namespace, labels.Set(pod.GetLabels()), vpaWithSelector.Vpa.Namespace, vpaWithSelector.Selector)
Expand Down
Loading