diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 2135623a..c0b9a6fd 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -19,7 +19,6 @@ package cache import ( "errors" "fmt" - "strings" "sync" crdinformers "github.com/aibrix/aibrix/pkg/client/informers/externalversions" @@ -41,16 +40,12 @@ var once sync.Once // type global type Cache struct { - mu sync.RWMutex - initialized bool - pods map[string]*v1.Pod - + mu sync.RWMutex + initialized bool + pods map[string]*v1.Pod podToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{} modelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod - - modelAdapterToPodMapping map[string][]string // TODO deprecate: model_adapter_name: []pods - podToModelAdapterMapping map[string]map[string]struct{} // TODO deprecate: pod_name: map[model_adapter_name]struct{} - podRequestTracker map[string]int + podRequestTracker map[string]int } var ( @@ -100,15 +95,11 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}) *Cache { } instance = Cache{ - initialized: true, - pods: map[string]*v1.Pod{}, - + initialized: true, + pods: map[string]*v1.Pod{}, podToModelMapping: map[string]map[string]struct{}{}, modelToPodMapping: map[string]map[string]*v1.Pod{}, - - modelAdapterToPodMapping: map[string][]string{}, - podToModelAdapterMapping: map[string]map[string]struct{}{}, - podRequestTracker: map[string]int{}, + podRequestTracker: map[string]int{}, } if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -144,7 +135,6 @@ func (c *Cache) addPod(obj interface{}) { c.pods[pod.Name] = pod c.addPodAndModelMapping(pod.Name, modelName) - c.podToModelAdapterMapping[pod.Name] = map[string]struct{}{} klog.Infof("POD CREATED: %s/%s", pod.Namespace, pod.Name) c.debugInfo() } @@ -196,9 +186,6 @@ func (c *Cache) addModelAdapter(obj interface{}) { c.addPodAndModelMapping(pod, model.Name) } - c.modelAdapterToPodMapping[model.Name] = model.Status.Instances - c.addModelAdapterMapping(model) - klog.Infof("MODELADAPTER CREATED: %s/%s", model.Namespace, model.Name) c.debugInfo() } @@ -218,10 +205,6 @@ func (c *Cache) updateModelAdapter(oldObj interface{}, newObj interface{}) { c.addPodAndModelMapping(pod, newModel.Name) } - c.modelAdapterToPodMapping[newModel.Name] = newModel.Status.Instances - c.deleteModelAdapterMapping(oldModel) - c.addModelAdapterMapping(newModel) - klog.Infof("MODELADAPTER UPDATED. %s/%s %s", oldModel.Namespace, oldModel.Name, newModel.Status.Phase) c.debugInfo() } @@ -235,9 +218,6 @@ func (c *Cache) deleteModelAdapter(obj interface{}) { c.deletePodAndModelMapping(pod, model.Name) } - delete(c.modelAdapterToPodMapping, model.Name) - c.deleteModelAdapterMapping(model) - klog.Infof("MODELADAPTER DELETED: %s/%s", model.Namespace, model.Name) c.debugInfo() } @@ -275,29 +255,6 @@ func (c *Cache) deletePodAndModelMapping(podName, modelName string) { delete(c.modelToPodMapping, modelName) } -func (c *Cache) addModelAdapterMapping(model *modelv1alpha1.ModelAdapter) { - for _, pod := range model.Status.Instances { - models, ok := c.podToModelAdapterMapping[pod] - if !ok { - c.podToModelAdapterMapping[pod] = map[string]struct{}{ - model.Name: {}, - } - continue - } - - models[model.Name] = struct{}{} - c.podToModelAdapterMapping[pod] = models - } -} - -func (c *Cache) deleteModelAdapterMapping(model *modelv1alpha1.ModelAdapter) { - for _, pod := range model.Status.Instances { - modelAdapters := c.podToModelAdapterMapping[pod] - delete(modelAdapters, model.Name) - c.podToModelAdapterMapping[pod] = modelAdapters - } -} - func (c *Cache) debugInfo() { for _, pod := range c.pods { klog.Info(pod.Name) @@ -316,23 +273,18 @@ func (c *Cache) debugInfo() { } klog.Infof("model: %s, pods: %s", modelName, podList) } +} - for model, instances := range c.modelAdapterToPodMapping { - klog.Infof("modelName: %s, instances: %v", model, instances) - } - - for pod, models := range c.podToModelAdapterMapping { - if !strings.HasPrefix(pod, "llama") { - continue - } - - modelsArr := []string{} - for m := range models { - modelsArr = append(modelsArr, m) - } +func (c *Cache) GetPod(podName string) (*v1.Pod, error) { + c.mu.RLock() + defer c.mu.RUnlock() - klog.Infof("podName: %s, modelAdapters: %v", pod, modelsArr) + pod, ok := c.pods[podName] + if !ok { + return nil, fmt.Errorf("pod does not exist in the cache: %s", podName) } + + return pod, nil } func (c *Cache) GetPods() map[string]*v1.Pod { @@ -342,24 +294,28 @@ func (c *Cache) GetPods() map[string]*v1.Pod { return c.pods } -func (c *Cache) GetPodsForModel(modelName string) map[string]*v1.Pod { +func (c *Cache) GetPodsForModel(modelName string) (map[string]*v1.Pod, error) { c.mu.RLock() defer c.mu.RUnlock() - pods := map[string]*v1.Pod{} - for podName, pod := range c.modelToPodMapping[modelName] { - klog.Info(podName) - pods[podName] = pod + podsMap, ok := c.modelToPodMapping[modelName] + if !ok { + return nil, fmt.Errorf("model does not exist in the cache: %s", modelName) } - return pods + return podsMap, nil } -func (c *Cache) GetPodToModelAdapterMapping() map[string]map[string]struct{} { +func (c *Cache) GetModelsForPod(podName string) (map[string]struct{}, error) { c.mu.RLock() defer c.mu.RUnlock() - return c.podToModelAdapterMapping + models, ok := c.podToModelMapping[podName] + if !ok { + return nil, fmt.Errorf("pod does not exist in the cache: %s", podName) + } + + return models, nil } func (c *Cache) IncrPodRequestCount(podName string) int { diff --git a/pkg/controller/modeladapter/scheduling/leastadapters.go b/pkg/controller/modeladapter/scheduling/leastadapters.go index f3cdf148..6b289cd1 100644 --- a/pkg/controller/modeladapter/scheduling/leastadapters.go +++ b/pkg/controller/modeladapter/scheduling/leastadapters.go @@ -18,7 +18,6 @@ package scheduling import ( "context" - "errors" "math" "github.com/aibrix/aibrix/pkg/cache" @@ -37,24 +36,20 @@ func NewLeastAdapters(c *cache.Cache) Scheduler { } func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) { - modelAdapterCountMin := math.MaxInt selectedPod := v1.Pod{} - podMap := r.cache.GetPods() - podToModelAdapterMapping := r.cache.GetPodToModelAdapterMapping() + modelAdapterCountMin := math.MaxInt for _, pod := range pods { - if _, ok := podMap[pod.Name]; !ok { - return nil, errors.New("pod not found in the cache") + models, err := r.cache.GetModelsForPod(pod.Name) + if err != nil { + return nil, err } - - modelAdapters := podToModelAdapterMapping[pod.Name] - if len(modelAdapters) < modelAdapterCountMin { + if len(models) < modelAdapterCountMin { selectedPod = pod - modelAdapterCountMin = len(modelAdapters) + modelAdapterCountMin = len(models) } } klog.Infof("pod selected with least model adapters: %s", selectedPod.Name) - return &selectedPod, nil } diff --git a/pkg/plugins/gateway/gateway.go b/pkg/plugins/gateway/gateway.go index 1424ad63..fd6f18f3 100644 --- a/pkg/plugins/gateway/gateway.go +++ b/pkg/plugins/gateway/gateway.go @@ -153,114 +153,66 @@ func (s *Server) HandleRequestHeaders(ctx context.Context, req *extProcPb.Proces klog.Infof("user: %v", user) code, err := s.checkRPM(ctx, user) if err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: code, - }, - Body: err.Error(), - Headers: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "x-rpm-exceeded", - RawValue: []byte("true"), - }, - }, - }, - }, - }, - }, - }, user, targetPodIP + return generateErrorResponse( + code, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-rpm-exceeded", RawValue: []byte("true"), + }}}, + fmt.Sprintf("pre query: error on checking rpm: %v", err.Error())), user, targetPodIP } code, err = s.checkTPM(ctx, user) if err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: code, - }, - Body: err.Error(), - Headers: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "x-tpm-exceeded", - RawValue: []byte("true"), - }, - }, - }, - }, - }, - }, - }, user, targetPodIP + return generateErrorResponse( + code, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-tpm-exceeded", RawValue: []byte("true"), + }}}, + fmt.Sprintf("pre query: error on checking tpm: %v", err.Error())), user, targetPodIP } - pods := s.cache.GetPodsForModel(model) - if len(pods) == 0 { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_ServiceUnavailable, - }, - Body: "no models are deployed", - Headers: &extProcPb.HeaderMutation{ - SetHeaders: []*configPb.HeaderValueOption{ - { - Header: &configPb.HeaderValue{ - Key: "x-no-model-deployment", - RawValue: []byte("true"), - }, - }, - }, - }, - }, - }, - }, user, targetPodIP + pods, err := s.cache.GetPodsForModel(model) + if len(pods) == 0 || err != nil { + return generateErrorResponse( + code, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-no-model-deployment", RawValue: []byte("true"), + }}}, + fmt.Sprintf("pre query: no models are deployed: %v", err.Error())), user, targetPodIP } - targetPodIP, err = s.SelectTargetPod(ctx, routingStrategy, pods) - if err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - Body: fmt.Sprintf("error on selecting target pod: %v", err.Error()), - }, - }, - }, user, targetPodIP + targetPodIP, err = s.selectTargetPod(ctx, routingStrategy, pods) + if targetPodIP == "" || err != nil { + return generateErrorResponse( + code, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-select-target-pod", RawValue: []byte("true"), + }}}, + fmt.Sprintf("pre query: error on selecting target pod: %v", err.Error())), user, targetPodIP } - headers := []*configPb.HeaderValueOption{{ - Header: &configPb.HeaderValue{ - Key: "x-went-into-req-headers", - RawValue: []byte("true"), - }, - }} - if targetPodIP != "" { - headers = append(headers, &configPb.HeaderValueOption{ - Header: &configPb.HeaderValue{ - Key: "target-pod", - RawValue: []byte(targetPodIP), - }, - }) - - podRequestCounter := s.cache.IncrPodRequestCount(fmt.Sprintf("%v_REQUEST_COUNT", targetPodIP)) - klog.Infof("RequestStart: SelectedTargetPodIP: %s, PodRequestCount: %v", targetPodIP, podRequestCounter) - } + podRequestCounter := s.cache.IncrPodRequestCount(fmt.Sprintf("%v_REQUEST_COUNT", targetPodIP)) + klog.Infof("RequestStart: SelectedTargetPodIP: %s, PodRequestCount: %v", targetPodIP, podRequestCounter) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestHeaders{ RequestHeaders: &extProcPb.HeadersResponse{ Response: &extProcPb.CommonResponse{ HeaderMutation: &extProcPb.HeaderMutation{ - SetHeaders: headers, + SetHeaders: []*configPb.HeaderValueOption{ + { + Header: &configPb.HeaderValue{ + Key: "x-went-into-req-headers", + RawValue: []byte("true"), + }, + }, + { + Header: &configPb.HeaderValue{ + Key: "target-pod", + RawValue: []byte(targetPodIP), + }, + }, + }, }, ClearRouteCache: true, }, @@ -340,59 +292,46 @@ func (s *Server) HandleResponseBody(ctx context.Context, req *extProcPb.Processi r := req.Request b := r.(*extProcPb.ProcessingRequest_ResponseBody) + defer func() { + podRequestCounter := s.cache.DecrPodRequestCount(fmt.Sprintf("%v_REQUEST_COUNT", targetPodIP)) + klog.Infof("RequestEnd: SelectedTargetPodIP: %s, PodRequestCount: %v", targetPodIP, podRequestCounter) + }() + var res openai.CompletionResponse if err := json.Unmarshal(b.ResponseBody.Body, &res); err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - Body: err.Error(), - }, - }, - } + return generateErrorResponse( + envoyTypePb.StatusCode_InternalServerError, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-error-response-unmarshal", RawValue: []byte("true"), + }}}, + err.Error()) } rpm, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_RPM_CURRENT", user), 1) if err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - Body: fmt.Sprintf("post query: error on updating rpm: %v", err.Error()), - }, - }, - } + return generateErrorResponse( + envoyTypePb.StatusCode_InternalServerError, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-error-update-rpm", RawValue: []byte("true"), + }}}, + fmt.Sprintf("post query: error on updating rpm: %v", err.Error())) } tpm, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_TPM_CURRENT", user), int64(res.Usage.TotalTokens)) if err != nil { - return &extProcPb.ProcessingResponse{ - Response: &extProcPb.ProcessingResponse_ImmediateResponse{ - ImmediateResponse: &extProcPb.ImmediateResponse{ - Status: &envoyTypePb.HttpStatus{ - Code: envoyTypePb.StatusCode_InternalServerError, - }, - Details: err.Error(), - Body: "post query: error on updating tpm", - }, - }, - } + return generateErrorResponse( + envoyTypePb.StatusCode_InternalServerError, + []*configPb.HeaderValueOption{{Header: &configPb.HeaderValue{ + Key: "x-error-update-tpm", RawValue: []byte("true"), + }}}, + fmt.Sprintf("post query: error on updating tpm: %v", err.Error())) } klog.Infof("Updated RPM: %v, TPM: %v for user: %v", rpm, tpm, user) - if targetPodIP != "" { - podRequestCounter := s.cache.DecrPodRequestCount(fmt.Sprintf("%v_REQUEST_COUNT", targetPodIP)) - klog.Infof("RequestEnd: SelectedTargetPodIP: %s, PodRequestCount: %v", targetPodIP, podRequestCounter) - - podTpm, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_THROUGHPUT", targetPodIP), int64(res.Usage.TotalTokens)) - if err != nil { - klog.Error(err) - } else { - klog.Infof("RequestEnd: SelectedTargetPodIP: %s, PodThroughput: %v", targetPodIP, podTpm) - } + podTpm, err := s.ratelimiter.Incr(ctx, fmt.Sprintf("%v_THROUGHPUT", targetPodIP), int64(res.Usage.TotalTokens)) + if err != nil { + klog.Error(err) + } else { + klog.Infof("RequestEnd: SelectedTargetPodIP: %s, PodThroughput: %v", targetPodIP, podTpm) } return &extProcPb.ProcessingResponse{ @@ -463,7 +402,7 @@ func (s *Server) checkTPM(ctx context.Context, user string) (envoyTypePb.StatusC return envoyTypePb.StatusCode_OK, nil } -func (s *Server) SelectTargetPod(ctx context.Context, routingStrategy string, pods map[string]*v1.Pod) (string, error) { +func (s *Server) selectTargetPod(ctx context.Context, routingStrategy string, pods map[string]*v1.Pod) (string, error) { var route routing.Router switch routingStrategy { case "least-request": @@ -476,3 +415,19 @@ func (s *Server) SelectTargetPod(ctx context.Context, routingStrategy string, po return route.Get(ctx, pods) } + +func generateErrorResponse(statusCode envoyTypePb.StatusCode, headers []*configPb.HeaderValueOption, body string) *extProcPb.ProcessingResponse { + return &extProcPb.ProcessingResponse{ + Response: &extProcPb.ProcessingResponse_ImmediateResponse{ + ImmediateResponse: &extProcPb.ImmediateResponse{ + Status: &envoyTypePb.HttpStatus{ + Code: statusCode, + }, + Headers: &extProcPb.HeaderMutation{ + SetHeaders: headers, + }, + Body: body, + }, + }, + } +}