diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go index 48397729ee96..c130b230ae33 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder.go @@ -81,6 +81,7 @@ type ClusterStateFeederFactory struct { KubeClient kube_client.Interface MetricsClient metrics.MetricsClient VpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter + VpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister VpaLister vpa_lister.VerticalPodAutoscalerLister PodLister v1lister.PodLister OOMObserver oom.Observer @@ -99,6 +100,7 @@ func (m ClusterStateFeederFactory) Make() *clusterStateFeeder { metricsClient: m.MetricsClient, oomChan: m.OOMObserver.GetObservedOomsChannel(), vpaCheckpointClient: m.VpaCheckpointClient, + vpaCheckpointLister: m.VpaCheckpointLister, vpaLister: m.VpaLister, clusterState: m.ClusterState, specClient: spec.NewSpecClient(m.PodLister), @@ -206,6 +208,7 @@ type clusterStateFeeder struct { metricsClient metrics.MetricsClient oomChan <-chan oom.OomInfo vpaCheckpointClient vpa_api.VerticalPodAutoscalerCheckpointsGetter + vpaCheckpointLister vpa_lister.VerticalPodAutoscalerCheckpointLister vpaLister vpa_lister.VerticalPodAutoscalerLister clusterState model.ClusterState selectorFetcher target.VpaTargetSelectorFetcher @@ -267,25 +270,29 @@ func (feeder *clusterStateFeeder) InitFromCheckpoints(ctx context.Context) { klog.V(3).InfoS("Initializing VPA from checkpoints") feeder.LoadVPAs(ctx) + klog.V(3).InfoS("Fetching VPA checkpoints") + checkpointList, err := feeder.vpaCheckpointLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "Cannot list VPA checkpoints") + } + namespaces := make(map[string]bool) for _, v := range feeder.clusterState.VPAs() { namespaces[v.ID.Namespace] = true } for namespace := range namespaces { - klog.V(3).InfoS("Fetching checkpoints", "namespace", namespace) - checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{}) - if err != nil { - klog.ErrorS(err, "Cannot list VPA checkpoints", "namespace", namespace) + if feeder.shouldIgnoreNamespace(namespace) { + klog.V(3).InfoS("Skipping loading VPA Checkpoints from namespace.", "namespace", namespace, "vpaObjectNamespace", feeder.vpaObjectNamespace, "ignoredNamespaces", feeder.ignoredNamespaces) + continue } - for _, checkpoint := range checkpointList.Items { + for _, checkpoint := range checkpointList { klog.V(3).InfoS("Loading checkpoint for VPA", "checkpoint", klog.KRef(checkpoint.Namespace, checkpoint.Spec.VPAObjectName), "container", checkpoint.Spec.ContainerName) - err = feeder.setVpaCheckpoint(&checkpoint) + err = feeder.setVpaCheckpoint(checkpoint) if err != nil { klog.ErrorS(err, "Error while loading checkpoint") } - } } } @@ -345,11 +352,12 @@ func (feeder *clusterStateFeeder) shouldIgnoreNamespace(namespace string) bool { func (feeder *clusterStateFeeder) cleanupCheckpointsForNamespace(ctx context.Context, namespace string, allVPAKeys map[model.VpaID]bool) error { var err error - checkpointList, err := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).List(ctx, metav1.ListOptions{}) + checkpointList, err := feeder.vpaCheckpointLister.VerticalPodAutoscalerCheckpoints(namespace).List(labels.Everything()) + if err != nil { return err } - for _, checkpoint := range checkpointList.Items { + for _, checkpoint := range checkpointList { vpaID := model.VpaID{Namespace: checkpoint.Namespace, VpaName: checkpoint.Spec.VPAObjectName} if !allVPAKeys[vpaID] { if errFeeder := feeder.vpaCheckpointClient.VerticalPodAutoscalerCheckpoints(namespace).Delete(ctx, checkpoint.Name, metav1.DeleteOptions{}); errFeeder != nil { diff --git a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go index f8a393128047..0c03a54f5435 100644 --- a/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go +++ b/vertical-pod-autoscaler/pkg/recommender/input/cluster_feeder_test.go @@ -859,11 +859,12 @@ func TestFilterVPAsIgnoreNamespaces(t *testing.T) { func TestCanCleanupCheckpoints(t *testing.T) { _, tctx := ktesting.NewTestContext(t) client := fake.NewSimpleClientset() + namespace := "testNamespace" - _, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "testNamespace"}}, metav1.CreateOptions{}) + _, err := client.CoreV1().Namespaces().Create(tctx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{}) assert.NoError(t, err) - vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace("testNamespace").WithTargetRef(&autoscalingv1.CrossVersionObjectReference{ + vpaBuilder := test.VerticalPodAutoscaler().WithContainer("container").WithNamespace(namespace).WithTargetRef(&autoscalingv1.CrossVersionObjectReference{ Kind: kind, Name: name1, APIVersion: apiVersion, @@ -878,22 +879,19 @@ func TestCanCleanupCheckpoints(t *testing.T) { vpaLister := &test.VerticalPodAutoscalerListerMock{} vpaLister.On("List").Return(vpas, nil) - checkpoints := &vpa_types.VerticalPodAutoscalerCheckpointList{ - Items: []vpa_types.VerticalPodAutoscalerCheckpoint{ - { - ObjectMeta: metav1.ObjectMeta{ - Namespace: "testNamespace", - Name: "nonExistentVPA", - }, - Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{ - VPAObjectName: "nonExistentVPA", - }, - }, + vpaCheckPoint := &vpa_types.VerticalPodAutoscalerCheckpoint{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "nonExistentVPA", + }, + Spec: vpa_types.VerticalPodAutoscalerCheckpointSpec{ + VPAObjectName: "nonExistentVPA", }, } + vpacheckpoints := []*vpa_types.VerticalPodAutoscalerCheckpoint{vpaCheckPoint} for _, vpa := range vpas { - checkpoints.Items = append(checkpoints.Items, vpa_types.VerticalPodAutoscalerCheckpoint{ + vpacheckpoints = append(vpacheckpoints, &vpa_types.VerticalPodAutoscalerCheckpoint{ ObjectMeta: metav1.ObjectMeta{ Namespace: vpa.Namespace, Name: vpa.Name, @@ -904,23 +902,29 @@ func TestCanCleanupCheckpoints(t *testing.T) { }) } + // Create a mock checkpoint client to track deletions checkpointClient := &fakeautoscalingv1.FakeAutoscalingV1{Fake: &core.Fake{}} - checkpointClient.AddReactor("list", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) { - return true, checkpoints, nil - }) - + // Track deleted checkpoints deletedCheckpoints := []string{} checkpointClient.AddReactor("delete", "verticalpodautoscalercheckpoints", func(action core.Action) (bool, runtime.Object, error) { deleteAction := action.(core.DeleteAction) deletedCheckpoints = append(deletedCheckpoints, deleteAction.GetName()) - return true, nil, nil }) + // Create namespace lister mock that will return the checkpoint list + checkpointNamespaceLister := &test.VerticalPodAutoscalerCheckPointListerMock{} + checkpointNamespaceLister.On("List").Return(vpacheckpoints, nil) + + // Create main checkpoint mock that will return the namespace lister + checkpointLister := &test.VerticalPodAutoscalerCheckPointListerMock{} + checkpointLister.On("VerticalPodAutoscalerCheckpoints", namespace).Return(checkpointNamespaceLister) + feeder := clusterStateFeeder{ coreClient: client.CoreV1(), vpaLister: vpaLister, vpaCheckpointClient: checkpointClient, + vpaCheckpointLister: checkpointLister, clusterState: model.NewClusterState(testGcPeriod), recommenderName: "default", } diff --git a/vertical-pod-autoscaler/pkg/recommender/main.go b/vertical-pod-autoscaler/pkg/recommender/main.go index ed7eb363efe5..4f0f2f997eaf 100644 --- a/vertical-pod-autoscaler/pkg/recommender/main.go +++ b/vertical-pod-autoscaler/pkg/recommender/main.go @@ -297,6 +297,7 @@ func run(ctx context.Context, healthCheck *metrics.HealthCheck, commonFlag *comm MetricsClient: input_metrics.NewMetricsClient(source, commonFlag.VpaObjectNamespace, "default-metrics-client"), VpaCheckpointClient: vpa_clientset.NewForConfigOrDie(config).AutoscalingV1(), VpaLister: vpa_api_util.NewVpasLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace), + VpaCheckpointLister: vpa_api_util.NewVpaCheckpointLister(vpa_clientset.NewForConfigOrDie(config), make(chan struct{}), commonFlag.VpaObjectNamespace), ClusterState: clusterState, SelectorFetcher: target.NewVpaTargetSelectorFetcher(config, kubeClient, factory), MemorySaveMode: *memorySaver, diff --git a/vertical-pod-autoscaler/pkg/utils/test/test_utils.go b/vertical-pod-autoscaler/pkg/utils/test/test_utils.go index 33beb7dadd44..8ae360177f31 100644 --- a/vertical-pod-autoscaler/pkg/utils/test/test_utils.go +++ b/vertical-pod-autoscaler/pkg/utils/test/test_utils.go @@ -200,6 +200,36 @@ func (m *VerticalPodAutoscalerListerMock) Get(name string) (*vpa_types.VerticalP return nil, fmt.Errorf("unimplemented") } +// VerticalPodAutoscalerCheckPointListerMock is a mock of VerticalPodAutoscalerCheckPointLister +type VerticalPodAutoscalerCheckPointListerMock struct { + mock.Mock +} + +// List is a mock implementation of VerticalPodAutoscalerLister.List +func (m *VerticalPodAutoscalerCheckPointListerMock) List(selector labels.Selector) (ret []*vpa_types.VerticalPodAutoscalerCheckpoint, err error) { + args := m.Called() + var returnArg []*vpa_types.VerticalPodAutoscalerCheckpoint + if args.Get(0) != nil { + returnArg = args.Get(0).([]*vpa_types.VerticalPodAutoscalerCheckpoint) + } + return returnArg, args.Error(1) +} + +// VerticalPodAutoscalerCheckpoints is a mock implementation of returning a lister for namespace. +func (m *VerticalPodAutoscalerCheckPointListerMock) VerticalPodAutoscalerCheckpoints(namespace string) vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister { + args := m.Called(namespace) + var returnArg vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister + if args.Get(0) != nil { + returnArg = args.Get(0).(vpa_lister.VerticalPodAutoscalerCheckpointNamespaceLister) + } + return returnArg +} + +// Get is not implemented for this mock +func (m *VerticalPodAutoscalerCheckPointListerMock) Get(name string) (*vpa_types.VerticalPodAutoscalerCheckpoint, error) { + return nil, fmt.Errorf("unimplemented") +} + // VerticalPodAutoscalerV1Beta1ListerMock is a mock of VerticalPodAutoscalerLister or // VerticalPodAutoscalerNamespaceLister - the crucial List method is the same. type VerticalPodAutoscalerV1Beta1ListerMock struct { diff --git a/vertical-pod-autoscaler/pkg/utils/vpa/api.go b/vertical-pod-autoscaler/pkg/utils/vpa/api.go index 402f2e7461f8..b30f3fc6039d 100644 --- a/vertical-pod-autoscaler/pkg/utils/vpa/api.go +++ b/vertical-pod-autoscaler/pkg/utils/vpa/api.go @@ -107,6 +107,36 @@ func NewVpasLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct return vpaLister } +// NewVpaCheckpointLister returns VerticalPodAutoscalerCheckpointLister configured to fetch all VPACheckpoint objects from namespace, +// set namespace to k8sapiv1.NamespaceAll to select all namespaces. +// The method blocks until vpaCheckpointLister is initially populated. +func NewVpaCheckpointLister(vpaClient *vpa_clientset.Clientset, stopChannel <-chan struct{}, namespace string) vpa_lister.VerticalPodAutoscalerCheckpointLister { + vpaListWatch := cache.NewListWatchFromClient(vpaClient.AutoscalingV1().RESTClient(), "verticalpodautoscalercheckpoints", namespace, fields.Everything()) + informerOptions := cache.InformerOptions{ + ObjectType: &vpa_types.VerticalPodAutoscalerCheckpoint{}, + ListerWatcher: vpaListWatch, + Handler: &cache.ResourceEventHandlerFuncs{}, + ResyncPeriod: 1 * time.Hour, + Indexers: cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + } + + store, controller := cache.NewInformerWithOptions(informerOptions) + indexer, ok := store.(cache.Indexer) + if !ok { + klog.ErrorS(nil, "Expected Indexer, but got a Store that does not implement Indexer") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } + vpaCheckpointLister := vpa_lister.NewVerticalPodAutoscalerCheckpointLister(indexer) + go controller.Run(stopChannel) + if !cache.WaitForCacheSync(stopChannel, controller.HasSynced) { + klog.ErrorS(nil, "Failed to sync VPA checkpoint cache during initialization") + klog.FlushAndExit(klog.ExitFlushTimeout, 1) + } else { + klog.InfoS("Initial VPA checkpoint synced successfully") + } + return vpaCheckpointLister +} + // PodMatchesVPA returns true iff the vpaWithSelector matches the Pod. func PodMatchesVPA(pod *core.Pod, vpaWithSelector *VpaWithSelector) bool { return PodLabelsMatchVPA(pod.Namespace, labels.Set(pod.GetLabels()), vpaWithSelector.Vpa.Namespace, vpaWithSelector.Selector)