From d753c88e2d112a3b2750b0dd843c4d340df213cd Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Sun, 20 Oct 2024 08:54:46 -0700 Subject: [PATCH] Support pod autoscaler periodically check (#306) * Support pod autoscaler periodically check * Fix the error case --- .../podautoscaler/podautoscaler_controller.go | 83 ++++++++++++++++--- 1 file changed, 73 insertions(+), 10 deletions(-) diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 6df1cf06..834818e5 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -23,7 +23,7 @@ import ( "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" - scaler "github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler" podutil "github.com/aibrix/aibrix/pkg/utils" autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" @@ -43,9 +43,11 @@ import ( "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" ) var ( @@ -66,10 +68,12 @@ func Add(mgr manager.Manager) error { func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // Instantiate a new PodAutoscalerReconciler with the given manager's client and scheme reconciler := &PodAutoscalerReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"), - Mapper: mgr.GetRESTMapper(), + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"), + Mapper: mgr.GetRESTMapper(), + resyncInterval: 30 * time.Second, // TODO: this should be override by an environment variable + eventCh: make(chan event.GenericEvent), } // During initialization, KNative passes a podCounter, which may be non-zero if the Scale object already exists. @@ -91,14 +95,32 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { // add adds a new Controller to mgr with r as the reconcile.Reconciler func add(mgr manager.Manager, r reconcile.Reconciler) error { + // Build raw source for periodical requeue events from event channel + reconciler := r.(*PodAutoscalerReconciler) + src := &source.Channel{ + Source: reconciler.eventCh, + } + // Create a new controller managed by AIBrix manager, watching for changes to PodAutoscaler objects // and HorizontalPodAutoscaler objects. err := ctrl.NewControllerManagedBy(mgr). For(&autoscalingv1alpha1.PodAutoscaler{}). Watches(&autoscalingv2.HorizontalPodAutoscaler{}, &handler.EnqueueRequestForObject{}). + WatchesRawSource(src, &handler.EnqueueRequestForObject{}). Complete(r) - klog.V(4).InfoS("Added AIBricks pod-autoscaler-controller successfully") + klog.InfoS("Added AIBricks pod-autoscaler-controller successfully") + + errChan := make(chan error) + go reconciler.Run(context.Background(), errChan) + klog.InfoS("Run pod-autoscaler-controller periodical syncs successfully") + + go func() { + for err := range errChan { + klog.Error(err, "Run function returned an error") + } + }() + return err } @@ -107,10 +129,12 @@ var _ reconcile.Reconciler = &PodAutoscalerReconciler{} // PodAutoscalerReconciler reconciles a PodAutoscaler object type PodAutoscalerReconciler struct { client.Client - Scheme *runtime.Scheme - EventRecorder record.EventRecorder - Mapper apimeta.RESTMapper - Autoscaler scaler.Scaler + Scheme *runtime.Scheme + EventRecorder record.EventRecorder + Mapper apimeta.RESTMapper + Autoscaler scaler.Scaler + resyncInterval time.Duration + eventCh chan event.GenericEvent } //+kubebuilder:rbac:groups=autoscaling.aibrix.ai,resources=podautoscalers,verbs=get;list;watch;create;update;patch;delete @@ -166,6 +190,45 @@ func (r *PodAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } +func (r *PodAutoscalerReconciler) Run(ctx context.Context, errChan chan<- error) { + ticker := time.NewTicker(r.resyncInterval) + defer ticker.Stop() + defer close(r.eventCh) + + for { + select { + case <-ticker.C: + klog.Info("enqueue all autoscalers") + // periodically sync all autoscaling objects + if err := r.enqueuePodAutoscalers(ctx); err != nil { + klog.ErrorS(err, "Failed to enqueue pod autoscalers") + errChan <- err + } + case <-ctx.Done(): + klog.Info("context done, stopping running the loop") + errChan <- ctx.Err() + return + } + } +} + +func (r *PodAutoscalerReconciler) enqueuePodAutoscalers(ctx context.Context) error { + podAutoscalerLists := &autoscalingv2.HorizontalPodAutoscalerList{} + opts := client.MatchingFields{} + if err := r.List(ctx, podAutoscalerLists, opts); err != nil { + return err + } + for _, pa := range podAutoscalerLists.Items { + // Let's operate the queue and just enqueue the object, that should be ok. + e := event.GenericEvent{ + Object: &pa, + } + r.eventCh <- e + } + + return nil +} + // checkValidAutoscalingStrategy checks if a string is in a list of valid strategies func checkValidAutoscalingStrategy(strategy autoscalingv1alpha1.ScalingStrategyType) bool { validStrategies := []autoscalingv1alpha1.ScalingStrategyType{autoscalingv1alpha1.HPA, autoscalingv1alpha1.APA, autoscalingv1alpha1.KPA}