From a60653a0b44f27d65aab771288908a44bd6bb683 Mon Sep 17 00:00:00 2001 From: Jiaxin Shan Date: Tue, 10 Sep 2024 15:01:56 -0700 Subject: [PATCH] Add finalizer and handle the model unload requests (#152) * Add complete finalizer handling process * Make changes to pass integration test * Use pod ip to invoke the unload endpoint --- docs/tutorial/lora/model_adapter.yaml | 8 +- .../modeladapter/modeladapter_controller.go | 124 +++++++++++++++--- pkg/controller/modeladapter/utils.go | 2 +- 3 files changed, 107 insertions(+), 27 deletions(-) diff --git a/docs/tutorial/lora/model_adapter.yaml b/docs/tutorial/lora/model_adapter.yaml index 3fd07ddf..389cc22b 100644 --- a/docs/tutorial/lora/model_adapter.yaml +++ b/docs/tutorial/lora/model_adapter.yaml @@ -2,13 +2,11 @@ apiVersion: model.aibrix.ai/v1alpha1 kind: ModelAdapter metadata: name: lora-1 - namespace: default + namespace: aibrix-system spec: baseModel: llama2-70b podSelector: matchLabels: model.aibrix.ai: llama2-70b - additionalConfig: - # could be model artifact etc. - modelArtifact: yard1/llama-2-7b-sql-lora-test - schedulerName: default-model-adapter-scheduler + artifactURL: huggingface://yard1/llama-2-7b-sql-lora-test + schedulerName: default diff --git a/pkg/controller/modeladapter/modeladapter_controller.go b/pkg/controller/modeladapter/modeladapter_controller.go index 86444d9a..6b1a1f1f 100644 --- a/pkg/controller/modeladapter/modeladapter_controller.go +++ b/pkg/controller/modeladapter/modeladapter_controller.go @@ -26,8 +26,6 @@ import ( "net/http" "time" - "sigs.k8s.io/controller-runtime/pkg/handler" - modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1" "github.com/aibrix/aibrix/pkg/cache" "github.com/aibrix/aibrix/pkg/controller/modeladapter/scheduling" @@ -47,13 +45,14 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( //ControllerUIDLabelKey = "model-adapter-controller-uid" - ModelAdapterFinalizer = "model-adapter-finalizer" + ModelAdapterFinalizer = "modeladapter.aibrix.ai/finalizer" ) var ( @@ -183,6 +182,41 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request return reconcile.Result{}, err } + if modelAdapter.ObjectMeta.DeletionTimestamp.IsZero() { + // the object is not being deleted, so if it does not have the finalizer, + // then lets add the finalizer and update the object. + if !controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) { + klog.Info("Adding finalizer for ModelAdapter") + if ok := controllerutil.AddFinalizer(modelAdapter, ModelAdapterFinalizer); !ok { + klog.Error("Failed to add finalizer for ModelAdapter") + return ctrl.Result{Requeue: true}, nil + } + if err := r.Update(ctx, modelAdapter); err != nil { + klog.Error("Failed to update custom resource to add finalizer") + return ctrl.Result{}, err + } + } + } else { + // the object is being deleted + if controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) { + // the finalizer is present, so let's unload lora from those inference engines + if err := r.unloadModelAdapter(modelAdapter); err != nil { + // if fail to delete unload lora here, return the error so it can be retried. + return ctrl.Result{}, err + } + if ok := controllerutil.RemoveFinalizer(modelAdapter, ModelAdapterFinalizer); !ok { + klog.Error("Failed to remove finalizer for ModelAdapter") + return ctrl.Result{Requeue: true}, nil + } + if err := r.Update(ctx, modelAdapter); err != nil { + klog.Error("Failed to update custom resource to remove finalizer") + return ctrl.Result{}, err + } + } + // Stop reconciliation as the item is being deleted + return ctrl.Result{}, nil + } + return r.DoReconcile(ctx, req, modelAdapter) } @@ -209,23 +243,6 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque } } - // TODO: better handle finalizer here. Then we can define some operations which should - // occur before the custom resource to be deleted. - // TODO: handle DeletionTimeStamp later. - if controllerutil.ContainsFinalizer(instance, ModelAdapterFinalizer) { - klog.Info("Adding finalizer for ModelAdapter") - - if ok := controllerutil.AddFinalizer(instance, ModelAdapterFinalizer); !ok { - klog.Error("Failed to add finalizer for ModelAdapter") - return ctrl.Result{Requeue: true}, nil - } - - if err := r.Update(ctx, instance); err != nil { - klog.Error("Failed to update custom resource to add finalizer") - return ctrl.Result{}, err - } - } - oldInstance := instance.DeepCopy() // Step 0: Validate ModelAdapter configurations @@ -405,7 +422,7 @@ func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance key := "DEBUG_MODE" value, exists := getEnvKey(key) host := fmt.Sprintf("http://%s:8000", pod.Status.PodIP) - if exists && value == "1" { + if exists && value == "on" { // 30080 is the nodePort of the base model service. host = fmt.Sprintf("http://%s:30080", "localhost") } @@ -528,6 +545,71 @@ func (r *ModelAdapterReconciler) loadModelAdapter(host string, instance *modelv1 return nil } + +// unloadModelAdapter unloads the loras from inference engines +func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error { + if len(instance.Status.Instances) == 0 { + klog.Warning("model adapter has not been deployed to any pods yet, skip unloading") + return nil + } + + // TODO:(jiaxin.shan) Support multiple instances + + podName := instance.Status.Instances[0] + targetPod := &corev1.Pod{} + if err := r.Get(context.TODO(), types.NamespacedName{ + Namespace: instance.Namespace, + Name: podName, + }, targetPod); err != nil { + if apierrors.IsNotFound(err) { + // since the pod doesn't exist, unload is unnecessary + return nil + } + klog.Warning("Error getting Pod from lora instance list", err) + return err + } + + payload := map[string]string{ + "lora_name": instance.Name, + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + return err + } + + url := fmt.Sprintf("http://%s:%d/v1/unload_lora_adapter", targetPod.Status.PodIP, 8000) + key := "DEBUG_MODE" + value, exists := getEnvKey(key) + if exists && value == "on" { + // 30080 is the nodePort of the base model service. + url = "http://localhost:30080/v1/unload_lora_adapter" + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { + if err := resp.Body.Close(); err != nil { + klog.InfoS("Error closing response body:", err) + } + }() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("failed to unload LoRA adapter: %s", body) + } + + return nil +} + func (r *ModelAdapterReconciler) updateModelAdapterState(ctx context.Context, instance *modelv1alpha1.ModelAdapter, phase modelv1alpha1.ModelAdapterPhase) error { if instance.Status.Phase == phase { return nil diff --git a/pkg/controller/modeladapter/utils.go b/pkg/controller/modeladapter/utils.go index 3eb9a365..04b3b168 100644 --- a/pkg/controller/modeladapter/utils.go +++ b/pkg/controller/modeladapter/utils.go @@ -70,7 +70,7 @@ func validateModelAdapter(instance *modelv1alpha1.ModelAdapter) error { // validateArtifactURL checks if the ArtifactURL has a valid schema (s3://, gcs://, huggingface://, https://) func validateArtifactURL(artifactURL string) error { - allowedSchemes := []string{"s3://", "gcs://", "huggingface://"} + allowedSchemes := []string{"s3://", "gcs://", "huggingface://", "hf://"} for _, scheme := range allowedSchemes { if strings.HasPrefix(artifactURL, scheme) { return nil