Skip to content

Commit

Permalink
Refactor the condition and status update
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffwan committed Sep 24, 2024
1 parent f3f992a commit a7fa056
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 100 deletions.
140 changes: 42 additions & 98 deletions pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,47 +217,36 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

return r.DoReconcile(ctx, req, modelAdapter)
}

func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Request, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) {
// Let's set the initial status when no status is available
if instance.Status.Conditions == nil || len(instance.Status.Conditions) == 0 {
instance.Status.Phase = modelv1alpha1.ModelAdapterPending
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeInitialized),
Status: metav1.ConditionUnknown,
Reason: "Reconciling",
Message: "Starting reconciliation",
LastTransitionTime: metav1.Now()})

if err := r.Status().Update(ctx, instance); err != nil {
klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance))
if modelAdapter.Status.Conditions == nil || len(modelAdapter.Status.Conditions) == 0 {
modelAdapter.Status.Phase = modelv1alpha1.ModelAdapterPending
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeInitialized), metav1.ConditionUnknown,
"Reconciling", "Starting reconciliation")
if err := r.updateStatus(ctx, modelAdapter, condition); err != nil {
klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(modelAdapter))
return reconcile.Result{}, err
}

// re-fetch the custom resource after updating the status to avoid 409 error here.
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
if err := r.Get(ctx, req.NamespacedName, modelAdapter); err != nil {
klog.Error(err, "Failed to re-fetch modelAdapter")
return ctrl.Result{}, err
}
}
return r.DoReconcile(ctx, req, modelAdapter)
}

func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Request, instance *modelv1alpha1.ModelAdapter) (ctrl.Result, error) {
oldInstance := instance.DeepCopy()

// Step 0: Validate ModelAdapter configurations
if err := validateModelAdapter(instance); err != nil {
klog.Error(err, "Failed to validate the ModelAdapter")

instance.Status.Phase = modelv1alpha1.ModelAdapterFailed
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated),
Status: metav1.ConditionFalse,
Reason: "ValidationFailed",
Message: "ModelAdapter resource is not valid",
LastTransitionTime: metav1.Now()})

if updateErr := r.Status().Update(ctx, instance); updateErr != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse,
"ValidationFailed", "ModelAdapter resource is not valid")
if updateErr := r.updateStatus(ctx, instance, condition); updateErr != nil {
klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance))
return reconcile.Result{}, updateErr
}
Expand All @@ -278,14 +267,9 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque
// TODO: make constraints like a framework and provides the validate() interface etc.
if len(backendPods) == 0 || len(backendPods) < int(*instance.Spec.Replicas) {
instance.Status.Phase = modelv1alpha1.ModelAdapterFailed
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated),
Status: metav1.ConditionFalse,
Reason: "ValidationFailed",
Message: "ModelAdapter pod selector can not find any pods or pod count is less than required replicas",
LastTransitionTime: metav1.Now()})

if updateErr := r.Status().Update(ctx, instance); updateErr != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse,
"ValidationFailed", "ModelAdapter pod selector can not find any pods or pod count is less than required replicas")
if updateErr := r.updateStatus(ctx, instance, condition); updateErr != nil {
klog.ErrorS(err, "Failed to update ModelAdapter status", "modelAdapter", klog.KObj(instance))
return reconcile.Result{}, updateErr
}
Expand Down Expand Up @@ -339,18 +323,13 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque
return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err
}

instance.Status.Phase = modelv1alpha1.ModelAdapterScheduling
podNames := ExtractPodNames(selectedPods)
instance.Status.Instances = podNames
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched),
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames),
LastTransitionTime: metav1.Now(),
})

if err := r.Status().Update(ctx, instance); err != nil {
instance.Status.Phase = modelv1alpha1.ModelAdapterScheduling
instance.Status.Instances = podNames
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), metav1.ConditionTrue,
"Reconciling", fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames))
if err := r.updateStatus(ctx, instance, condition); err != nil {
klog.InfoS("Got error when updating status", "cluster name", req.Name, "error", err, "ModelAdapter", instance)
return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err
}
Expand All @@ -362,20 +341,13 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque
if err != nil {
return ctrl.Result{}, err
}
podNames := ExtractPodNames(podsWithModelAdapter)

instance.Status.Phase = modelv1alpha1.ModelAdapterScaling
podNames := ExtractPodNames(podsWithModelAdapter)
instance.Status.Instances = Difference(podNames, podUnloadedAdapters)
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched),
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames),
LastTransitionTime: metav1.Now(),
})

if err := r.Status().Update(ctx, instance); err != nil {
klog.InfoS("Got error when updating status", "cluster name", req.Name, "error", err, "ModelAdapter", instance)
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeSelectorMatched), metav1.ConditionTrue,
"Reconciling", fmt.Sprintf("ModelAdapter %s has been allocated to pod %s", klog.KObj(instance), podNames))
if err = r.updateStatus(ctx, instance, condition); err != nil {
return ctrl.Result{RequeueAfter: defaultRequeueDuration}, err
}

Expand Down Expand Up @@ -414,23 +386,18 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque
if r.inconsistentModelAdapterStatus(oldInstance.Status, instance.Status) {
klog.InfoS("model adapter reconcile", "Update CR status", req.Name, "status", instance.Status)
instance.Status.Phase = modelv1alpha1.ModelAdapterRunning
if err = r.updateStatus(ctx, instance); err != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionReady), metav1.ConditionTrue,
"Reconciling", fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance)))
if err = r.updateStatus(ctx, instance, condition); err != nil {
return reconcile.Result{}, fmt.Errorf("update modelAdapter status error: %v", err)
}
}

return ctrl.Result{}, nil
}

func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error {
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionReady),
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("ModelAdapter %s is ready", klog.KObj(instance)),
LastTransitionTime: metav1.Now(),
})

func (r *ModelAdapterReconciler) updateStatus(ctx context.Context, instance *modelv1alpha1.ModelAdapter, condition metav1.Condition) error {
meta.SetStatusCondition(&instance.Status.Conditions, condition)
return r.Status().Update(ctx, instance)
}

Expand All @@ -444,15 +411,9 @@ func (r *ModelAdapterReconciler) clearModelAdapterInstanceList(ctx context.Conte
}

instance.Status.Instances = newList
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionCleanup),
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("Pod (%s) can not be fetched for model adapter (%s), clean up the list", stalePodName, instance.Name),
LastTransitionTime: metav1.Now(),
})

if err := r.Status().Update(ctx, instance); err != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionCleanup), metav1.ConditionTrue,
"Reconciling", fmt.Sprintf("Pod (%s) can not be fetched for model adapter (%s), clean up the list", stalePodName, instance.Name))
if err := r.updateStatus(ctx, instance, condition); err != nil {
klog.Error(err, "Failed to update modelAdapter status")
return err
}
Expand Down Expand Up @@ -541,14 +502,9 @@ func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance

// Update the instance status once all binding has been done.
instance.Status.Phase = modelv1alpha1.ModelAdapterBinding
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeScheduled),
Status: metav1.ConditionTrue,
Reason: "Reconciling",
Message: fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance)),
LastTransitionTime: metav1.Now(),
})
if err := r.Status().Update(ctx, instance); err != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeScheduled), metav1.ConditionTrue,
"Reconciling", fmt.Sprintf("ModelAdapter %s is loaded", klog.KObj(instance)))
if err := r.updateStatus(ctx, instance, condition); err != nil {
klog.InfoS("Got error when updating status", "error", err, "ModelAdapter", instance)
return err
}
Expand Down Expand Up @@ -737,15 +693,9 @@ func (r *ModelAdapterReconciler) reconcileService(ctx context.Context, instance
svc, err := buildModelAdapterService(instance)
if err != nil {
klog.ErrorS(err, "Failed to define new Service resource for ModelAdapter")
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated),
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: fmt.Sprintf("Failed to create Service for the custom resource (%s): (%s)", instance.Name, err),
LastTransitionTime: metav1.Now(),
})

if err := r.Status().Update(ctx, instance); err != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse,
"Reconciling", fmt.Sprintf("Failed to create Service for the custom resource (%s): (%s)", instance.Name, err))
if err := r.updateStatus(ctx, instance, condition); err != nil {
klog.Error(err, "Failed to update modelAdapter status")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -785,15 +735,9 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins
if err != nil {
klog.ErrorS(err, "Failed to define new EndpointSlice resource for ModelAdapter")
instance.Status.Phase = modelv1alpha1.ModelAdapterFailed
meta.SetStatusCondition(&instance.Status.Conditions, metav1.Condition{
Type: string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated),
Status: metav1.ConditionFalse,
Reason: "Reconciling",
Message: fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err),
LastTransitionTime: metav1.Now(),
})

if err := r.Status().Update(ctx, instance); err != nil {
condition := NewCondition(string(modelv1alpha1.ModelAdapterConditionTypeResourceCreated), metav1.ConditionFalse,
"Reconciling", fmt.Sprintf("Failed to create EndpointSlice for the custom resource (%s): (%s)", instance.Name, err))
if err := r.updateStatus(ctx, instance, condition); err != nil {
klog.Error(err, "Failed to update modelAdapter status")
return ctrl.Result{}, err
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/controller/modeladapter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package modeladapter
import (
"errors"
"fmt"

"net/url"
"os"
"strings"

corev1 "k8s.io/api/core/v1"

modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func validateModelAdapter(instance *modelv1alpha1.ModelAdapter) error {
Expand Down Expand Up @@ -169,3 +170,14 @@ func Difference(a, b []string) []string {

return diff
}

// NewCondition creates a new replicaset condition.
func NewCondition(condType string, status metav1.ConditionStatus, reason, msg string) metav1.Condition {
return metav1.Condition{
Type: condType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: msg,
}
}

0 comments on commit a7fa056

Please sign in to comment.