Skip to content

Commit

Permalink
Support pod autoscaler periodically check (#306)
Browse files Browse the repository at this point in the history
* Support pod autoscaler periodically check

* Fix the error case
  • Loading branch information
Jeffwan authored Oct 20, 2024
1 parent f19f5d8 commit d753c88
Showing 1 changed file with 73 additions and 10 deletions.
83 changes: 73 additions & 10 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics"

scaler "github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler"
"github.com/aibrix/aibrix/pkg/controller/podautoscaler/scaler"
podutil "github.com/aibrix/aibrix/pkg/utils"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
Expand All @@ -43,9 +43,11 @@ import (
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
Expand All @@ -66,10 +68,12 @@ func Add(mgr manager.Manager) error {
func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {
// Instantiate a new PodAutoscalerReconciler with the given manager's client and scheme
reconciler := &PodAutoscalerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"),
Mapper: mgr.GetRESTMapper(),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
EventRecorder: mgr.GetEventRecorderFor("PodAutoscaler"),
Mapper: mgr.GetRESTMapper(),
resyncInterval: 30 * time.Second, // TODO: this should be override by an environment variable
eventCh: make(chan event.GenericEvent),
}

// During initialization, KNative passes a podCounter, which may be non-zero if the Scale object already exists.
Expand All @@ -91,14 +95,32 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) {

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
// Build raw source for periodical requeue events from event channel
reconciler := r.(*PodAutoscalerReconciler)
src := &source.Channel{
Source: reconciler.eventCh,
}

// Create a new controller managed by AIBrix manager, watching for changes to PodAutoscaler objects
// and HorizontalPodAutoscaler objects.
err := ctrl.NewControllerManagedBy(mgr).
For(&autoscalingv1alpha1.PodAutoscaler{}).
Watches(&autoscalingv2.HorizontalPodAutoscaler{}, &handler.EnqueueRequestForObject{}).
WatchesRawSource(src, &handler.EnqueueRequestForObject{}).
Complete(r)

klog.V(4).InfoS("Added AIBricks pod-autoscaler-controller successfully")
klog.InfoS("Added AIBricks pod-autoscaler-controller successfully")

errChan := make(chan error)
go reconciler.Run(context.Background(), errChan)
klog.InfoS("Run pod-autoscaler-controller periodical syncs successfully")

go func() {
for err := range errChan {
klog.Error(err, "Run function returned an error")
}
}()

return err
}

Expand All @@ -107,10 +129,12 @@ var _ reconcile.Reconciler = &PodAutoscalerReconciler{}
// PodAutoscalerReconciler reconciles a PodAutoscaler object
type PodAutoscalerReconciler struct {
client.Client
Scheme *runtime.Scheme
EventRecorder record.EventRecorder
Mapper apimeta.RESTMapper
Autoscaler scaler.Scaler
Scheme *runtime.Scheme
EventRecorder record.EventRecorder
Mapper apimeta.RESTMapper
Autoscaler scaler.Scaler
resyncInterval time.Duration
eventCh chan event.GenericEvent
}

//+kubebuilder:rbac:groups=autoscaling.aibrix.ai,resources=podautoscalers,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -166,6 +190,45 @@ func (r *PodAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

func (r *PodAutoscalerReconciler) Run(ctx context.Context, errChan chan<- error) {
ticker := time.NewTicker(r.resyncInterval)
defer ticker.Stop()
defer close(r.eventCh)

for {
select {
case <-ticker.C:
klog.Info("enqueue all autoscalers")
// periodically sync all autoscaling objects
if err := r.enqueuePodAutoscalers(ctx); err != nil {
klog.ErrorS(err, "Failed to enqueue pod autoscalers")
errChan <- err
}
case <-ctx.Done():
klog.Info("context done, stopping running the loop")
errChan <- ctx.Err()
return
}
}
}

func (r *PodAutoscalerReconciler) enqueuePodAutoscalers(ctx context.Context) error {
podAutoscalerLists := &autoscalingv2.HorizontalPodAutoscalerList{}
opts := client.MatchingFields{}
if err := r.List(ctx, podAutoscalerLists, opts); err != nil {
return err
}
for _, pa := range podAutoscalerLists.Items {
// Let's operate the queue and just enqueue the object, that should be ok.
e := event.GenericEvent{
Object: &pa,
}
r.eventCh <- e
}

return nil
}

// checkValidAutoscalingStrategy checks if a string is in a list of valid strategies
func checkValidAutoscalingStrategy(strategy autoscalingv1alpha1.ScalingStrategyType) bool {
validStrategies := []autoscalingv1alpha1.ScalingStrategyType{autoscalingv1alpha1.HPA, autoscalingv1alpha1.APA, autoscalingv1alpha1.KPA}
Expand Down

0 comments on commit d753c88

Please sign in to comment.