Skip to content

Commit

Permalink
Add finalizer and handle the model unload requests (#152)
Browse files Browse the repository at this point in the history
* Add complete finalizer handling process

* Make changes to pass integration test

* Use pod ip to invoke the unload endpoint
  • Loading branch information
Jeffwan authored Sep 10, 2024
1 parent 705f6b6 commit a60653a
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 27 deletions.
8 changes: 3 additions & 5 deletions docs/tutorial/lora/model_adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ apiVersion: model.aibrix.ai/v1alpha1
kind: ModelAdapter
metadata:
name: lora-1
namespace: default
namespace: aibrix-system
spec:
baseModel: llama2-70b
podSelector:
matchLabels:
model.aibrix.ai: llama2-70b
additionalConfig:
# could be model artifact etc.
modelArtifact: yard1/llama-2-7b-sql-lora-test
schedulerName: default-model-adapter-scheduler
artifactURL: huggingface://yard1/llama-2-7b-sql-lora-test
schedulerName: default
124 changes: 103 additions & 21 deletions pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"net/http"
"time"

"sigs.k8s.io/controller-runtime/pkg/handler"

modelv1alpha1 "github.com/aibrix/aibrix/api/model/v1alpha1"
"github.com/aibrix/aibrix/pkg/cache"
"github.com/aibrix/aibrix/pkg/controller/modeladapter/scheduling"
Expand All @@ -47,13 +45,14 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
//ControllerUIDLabelKey = "model-adapter-controller-uid"
ModelAdapterFinalizer = "model-adapter-finalizer"
ModelAdapterFinalizer = "modeladapter.aibrix.ai/finalizer"
)

var (
Expand Down Expand Up @@ -183,6 +182,41 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return reconcile.Result{}, err
}

if modelAdapter.ObjectMeta.DeletionTimestamp.IsZero() {
// the object is not being deleted, so if it does not have the finalizer,
// then lets add the finalizer and update the object.
if !controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) {
klog.Info("Adding finalizer for ModelAdapter")
if ok := controllerutil.AddFinalizer(modelAdapter, ModelAdapterFinalizer); !ok {
klog.Error("Failed to add finalizer for ModelAdapter")
return ctrl.Result{Requeue: true}, nil
}
if err := r.Update(ctx, modelAdapter); err != nil {
klog.Error("Failed to update custom resource to add finalizer")
return ctrl.Result{}, err
}
}
} else {
// the object is being deleted
if controllerutil.ContainsFinalizer(modelAdapter, ModelAdapterFinalizer) {
// the finalizer is present, so let's unload lora from those inference engines
if err := r.unloadModelAdapter(modelAdapter); err != nil {
// if fail to delete unload lora here, return the error so it can be retried.
return ctrl.Result{}, err
}
if ok := controllerutil.RemoveFinalizer(modelAdapter, ModelAdapterFinalizer); !ok {
klog.Error("Failed to remove finalizer for ModelAdapter")
return ctrl.Result{Requeue: true}, nil
}
if err := r.Update(ctx, modelAdapter); err != nil {
klog.Error("Failed to update custom resource to remove finalizer")
return ctrl.Result{}, err
}
}
// Stop reconciliation as the item is being deleted
return ctrl.Result{}, nil
}

return r.DoReconcile(ctx, req, modelAdapter)
}

Expand All @@ -209,23 +243,6 @@ func (r *ModelAdapterReconciler) DoReconcile(ctx context.Context, req ctrl.Reque
}
}

// TODO: better handle finalizer here. Then we can define some operations which should
// occur before the custom resource to be deleted.
// TODO: handle DeletionTimeStamp later.
if controllerutil.ContainsFinalizer(instance, ModelAdapterFinalizer) {
klog.Info("Adding finalizer for ModelAdapter")

if ok := controllerutil.AddFinalizer(instance, ModelAdapterFinalizer); !ok {
klog.Error("Failed to add finalizer for ModelAdapter")
return ctrl.Result{Requeue: true}, nil
}

if err := r.Update(ctx, instance); err != nil {
klog.Error("Failed to update custom resource to add finalizer")
return ctrl.Result{}, err
}
}

oldInstance := instance.DeepCopy()

// Step 0: Validate ModelAdapter configurations
Expand Down Expand Up @@ -405,7 +422,7 @@ func (r *ModelAdapterReconciler) reconcileLoading(ctx context.Context, instance
key := "DEBUG_MODE"
value, exists := getEnvKey(key)
host := fmt.Sprintf("http://%s:8000", pod.Status.PodIP)
if exists && value == "1" {
if exists && value == "on" {
// 30080 is the nodePort of the base model service.
host = fmt.Sprintf("http://%s:30080", "localhost")
}
Expand Down Expand Up @@ -528,6 +545,71 @@ func (r *ModelAdapterReconciler) loadModelAdapter(host string, instance *modelv1

return nil
}

// unloadModelAdapter unloads the loras from inference engines
func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error {
if len(instance.Status.Instances) == 0 {
klog.Warning("model adapter has not been deployed to any pods yet, skip unloading")
return nil
}

// TODO:(jiaxin.shan) Support multiple instances

podName := instance.Status.Instances[0]
targetPod := &corev1.Pod{}
if err := r.Get(context.TODO(), types.NamespacedName{
Namespace: instance.Namespace,
Name: podName,
}, targetPod); err != nil {
if apierrors.IsNotFound(err) {
// since the pod doesn't exist, unload is unnecessary
return nil
}
klog.Warning("Error getting Pod from lora instance list", err)
return err
}

payload := map[string]string{
"lora_name": instance.Name,
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
return err
}

url := fmt.Sprintf("http://%s:%d/v1/unload_lora_adapter", targetPod.Status.PodIP, 8000)
key := "DEBUG_MODE"
value, exists := getEnvKey(key)
if exists && value == "on" {
// 30080 is the nodePort of the base model service.
url = "http://localhost:30080/v1/unload_lora_adapter"
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadBytes))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer func() {
if err := resp.Body.Close(); err != nil {
klog.InfoS("Error closing response body:", err)
}
}()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("failed to unload LoRA adapter: %s", body)
}

return nil
}

func (r *ModelAdapterReconciler) updateModelAdapterState(ctx context.Context, instance *modelv1alpha1.ModelAdapter, phase modelv1alpha1.ModelAdapterPhase) error {
if instance.Status.Phase == phase {
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/modeladapter/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func validateModelAdapter(instance *modelv1alpha1.ModelAdapter) error {

// validateArtifactURL checks if the ArtifactURL has a valid schema (s3://, gcs://, huggingface://, https://)
func validateArtifactURL(artifactURL string) error {
allowedSchemes := []string{"s3://", "gcs://", "huggingface://"}
allowedSchemes := []string{"s3://", "gcs://", "huggingface://", "hf://"}
for _, scheme := range allowedSchemes {
if strings.HasPrefix(artifactURL, scheme) {
return nil
Expand Down

0 comments on commit a60653a

Please sign in to comment.