Skip to content

Commit

Permalink
[Bug] fix: correct non-inherited context (#763)
Browse files Browse the repository at this point in the history
fix: correct non-inherited context

Signed-off-by: Abirdcfly <[email protected]>
  • Loading branch information
Abirdcfly authored Feb 28, 2025
1 parent c225afc commit f2e03aa
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 29 deletions.
8 changes: 4 additions & 4 deletions pkg/controller/modeladapter/modeladapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (r *ModelAdapterReconciler) Reconcile(ctx context.Context, req ctrl.Request
// the finalizer is present, so let's unload lora from those inference engines
// note: the base model pod could be deleted as well, so here we do best effort offloading
// we do not need to reconcile the object if it encounters the unloading error.
if err := r.unloadModelAdapter(modelAdapter); err != nil {
if err := r.unloadModelAdapter(ctx, modelAdapter); err != nil {
return ctrl.Result{}, err
}
if ok := controllerutil.RemoveFinalizer(modelAdapter, ModelAdapterFinalizer); !ok {
Expand Down Expand Up @@ -682,7 +682,7 @@ func (r *ModelAdapterReconciler) loadModelAdapter(url string, instance *modelv1a

// unloadModelAdapter unloads the loras from inference engines
// base model pod could be deleted, in this case, we just do optimistic unloading. It only returns some necessary errors and http errors should not be returned.
func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.ModelAdapter) error {
func (r *ModelAdapterReconciler) unloadModelAdapter(ctx context.Context, instance *modelv1alpha1.ModelAdapter) error {
if len(instance.Status.Instances) == 0 {
klog.Warningf("model adapter %s/%s has not been deployed to any pods yet, skip unloading", instance.GetNamespace(), instance.GetName())
return nil
Expand All @@ -692,7 +692,7 @@ func (r *ModelAdapterReconciler) unloadModelAdapter(instance *modelv1alpha1.Mode

podName := instance.Status.Instances[0]
targetPod := &corev1.Pod{}
if err := r.Get(context.TODO(), types.NamespacedName{
if err := r.Get(ctx, types.NamespacedName{
Namespace: instance.Namespace,
Name: podName,
}, targetPod); err != nil {
Expand Down Expand Up @@ -803,7 +803,7 @@ func (r *ModelAdapterReconciler) reconcileEndpointSlice(ctx context.Context, ins
// TODO: do necessary refactor to support multiple lora instance
podName := instance.Status.Instances[0]
pod := &corev1.Pod{}
if err := r.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: podName}, pod); err != nil {
if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: podName}, pod); err != nil {
if !apierrors.IsNotFound(err) {
klog.Warning("Error getting Pod from lora instance list", err)
return ctrl.Result{}, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ func (r *PodAutoscalerReconciler) scaleForResourceMappings(ctx context.Context,
scale.SetNamespace(namespace)
scale.SetName(name)

err := r.Get(context.TODO(), client.ObjectKey{Namespace: namespace, Name: name}, scale)
err := r.Get(ctx, client.ObjectKey{Namespace: namespace, Name: name}, scale)
if err == nil {
return scale, targetGR, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/rayclusterfleet/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func (r *RayClusterFleetReconciler) syncRolloutStatus(ctx context.Context, allRS

newDeployment := d
newDeployment.Status = newStatus
err := r.Status().Update(context.Background(), newDeployment)
err := r.Status().Update(ctx, newDeployment)
return err
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/rayclusterfleet/rayclusterfleet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (r *RayClusterFleetReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, err
}

clusterMap, err := r.getRayClusterMapForFleet(f, rsList)
clusterMap, err := r.getRayClusterMapForFleet(ctx, f, rsList)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (r *RayClusterFleetReconciler) getReplicaSetsForFleet(ctx context.Context,
return ownedReplicaSets, nil
}

func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(f *orchestrationv1alpha1.RayClusterFleet, rsList []*orchestrationv1alpha1.RayClusterReplicaSet) (map[types.UID][]*rayclusterv1.RayCluster, error) {
func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(ctx context.Context, f *orchestrationv1alpha1.RayClusterFleet, rsList []*orchestrationv1alpha1.RayClusterReplicaSet) (map[types.UID][]*rayclusterv1.RayCluster, error) {
clusterList := &rayclusterv1.RayClusterList{}

// Get all RayClusters matches the fleet
Expand All @@ -218,7 +218,7 @@ func (r *RayClusterFleetReconciler) getRayClusterMapForFleet(f *orchestrationv1a
return nil, err
}

err = r.List(context.TODO(), clusterList, client.InNamespace(f.Namespace), client.MatchingLabelsSelector{Selector: selector})
err = r.List(ctx, clusterList, client.InNamespace(f.Namespace), client.MatchingLabelsSelector{Selector: selector})
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (r *RayClusterReplicaSetReconciler) Reconcile(ctx context.Context, req ctrl

// status update if necessary
newStatus := calculateStatus(replicaset, filteredClusters, scaleError)
if err := r.updateReplicaSetStatus(replicaset, newStatus, rsKey); err != nil {
if err := r.updateReplicaSetStatus(ctx, replicaset, newStatus, rsKey); err != nil {
return reconcile.Result{}, err
}

Expand Down Expand Up @@ -202,7 +202,7 @@ func (r *RayClusterReplicaSetReconciler) scaleDown(ctx context.Context, replicas
}

// updateReplicaSetStatus attempts to update the Status.Replicas of the given ReplicaSet, with a single GET/PUT retry.
func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(rs *orchestrationv1alpha1.RayClusterReplicaSet, newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus, rsKey string) error {
func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(ctx context.Context, rs *orchestrationv1alpha1.RayClusterReplicaSet, newStatus orchestrationv1alpha1.RayClusterReplicaSetStatus, rsKey string) error {
// Check if the expectations have been fulfilled for this ReplicaSet
if !r.Expectations.SatisfiedExpectations(rsKey) {
klog.V(4).Info("Expectations not yet fulfilled for ReplicaSet, delaying status update", "replicaSet", rsKey)
Expand All @@ -227,7 +227,7 @@ func (r *RayClusterReplicaSetReconciler) updateReplicaSetStatus(rs *orchestratio
// Update ReplicaSet status if necessary
newInstance := rs.DeepCopy()
newInstance.Status = newStatus
if err := r.Status().Update(context.Background(), newInstance); err != nil {
if err := r.Status().Update(ctx, newInstance); err != nil {
klog.ErrorS(err, "unable to update ReplicaSet status")
return err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/metadata/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func (s *httpServer) createUser(w http.ResponseWriter, r *http.Request) {
return
}

if utils.CheckUser(u, s.redisClient) {
if utils.CheckUser(r.Context(), u, s.redisClient) {
fmt.Fprintf(w, "User: %+v exists", u.Name)
return
}

if err := utils.SetUser(u, s.redisClient); err != nil {
if err := utils.SetUser(r.Context(), u, s.redisClient); err != nil {
http.Error(w, fmt.Sprintf("error occurred on creating user: %+v", err), http.StatusInternalServerError)
return
}
Expand All @@ -90,7 +90,7 @@ func (s *httpServer) readUser(w http.ResponseWriter, r *http.Request) {
return
}

user, err := utils.GetUser(u, s.redisClient)
user, err := utils.GetUser(r.Context(), u, s.redisClient)
if err != nil {
fmt.Fprint(w, "user does not exists")
return
Expand All @@ -114,12 +114,12 @@ func (s *httpServer) updateUser(w http.ResponseWriter, r *http.Request) {
return
}

if !utils.CheckUser(u, s.redisClient) {
if !utils.CheckUser(r.Context(), u, s.redisClient) {
fmt.Fprintf(w, "User: %+v does not exists", u.Name)
return
}

if err := utils.SetUser(u, s.redisClient); err != nil {
if err := utils.SetUser(r.Context(), u, s.redisClient); err != nil {
http.Error(w, fmt.Sprintf("error occurred on updating user: %+v", err), http.StatusInternalServerError)
return
}
Expand All @@ -142,12 +142,12 @@ func (s *httpServer) deleteUser(w http.ResponseWriter, r *http.Request) {
return
}

if !utils.CheckUser(u, s.redisClient) {
if !utils.CheckUser(r.Context(), u, s.redisClient) {
fmt.Fprintf(w, "User: %+v does not exists", u.Name)
return
}

if err := utils.DelUser(u, s.redisClient); err != nil {
if err := utils.DelUser(r.Context(), u, s.redisClient); err != nil {
http.Error(w, fmt.Sprintf("error occurred on deleting user: %+v", err), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugins/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/vllm-project/aibrix/pkg/cache"
routing "github.com/vllm-project/aibrix/pkg/plugins/gateway/algorithms"
ratelimiter "github.com/vllm-project/aibrix/pkg/plugins/gateway/ratelimiter"
"github.com/vllm-project/aibrix/pkg/plugins/gateway/ratelimiter"
"github.com/vllm-project/aibrix/pkg/utils"
healthPb "google.golang.org/grpc/health/grpc_health_v1"
)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, requestID string, req
}

if username != "" {
user, err = utils.GetUser(utils.User{Name: username}, s.redisClient)
user, err = utils.GetUser(ctx, utils.User{Name: username}, s.redisClient)
if err != nil {
klog.ErrorS(err, "unable to process user info", "requestID", requestID, "username", username)
return generateErrorResponse(
Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/users.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ type User struct {
Tpm int64 `json:"tpm"`
}

func CheckUser(u User, redisClient *redis.Client) bool {
val, err := redisClient.Exists(context.Background(), genKey(u.Name)).Result()
func CheckUser(ctx context.Context, u User, redisClient *redis.Client) bool {
val, err := redisClient.Exists(ctx, genKey(u.Name)).Result()
if err != nil {
return false
}

return val != 0
}

func GetUser(u User, redisClient *redis.Client) (User, error) {
val, err := redisClient.Get(context.Background(), genKey(u.Name)).Result()
func GetUser(ctx context.Context, u User, redisClient *redis.Client) (User, error) {
val, err := redisClient.Get(ctx, genKey(u.Name)).Result()
if err != nil {
return User{}, err
}
Expand All @@ -53,7 +53,7 @@ func GetUser(u User, redisClient *redis.Client) (User, error) {
return *user, nil
}

func SetUser(u User, redisClient *redis.Client) error {
func SetUser(ctx context.Context, u User, redisClient *redis.Client) error {
if u.Rpm < 0 || u.Tpm < 0 {
return fmt.Errorf("rpm or tpm can not negative")
}
Expand All @@ -63,11 +63,11 @@ func SetUser(u User, redisClient *redis.Client) error {
return err
}

return redisClient.Set(context.Background(), genKey(u.Name), string(b), 0).Err()
return redisClient.Set(ctx, genKey(u.Name), string(b), 0).Err()
}

func DelUser(u User, redisClient *redis.Client) error {
return redisClient.Del(context.Background(), genKey(u.Name)).Err()
func DelUser(ctx context.Context, u User, redisClient *redis.Client) error {
return redisClient.Del(ctx, genKey(u.Name)).Err()
}

func genKey(s string) string {
Expand Down

0 comments on commit f2e03aa

Please sign in to comment.