Skip to content

Commit

Permalink
nit: gateway error response code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
varungupta authored and varungupta committed Sep 16, 2024
1 parent 9639b72 commit 77d5299
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 217 deletions.
100 changes: 28 additions & 72 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package cache
import (
"errors"
"fmt"
"strings"
"sync"

crdinformers "github.com/aibrix/aibrix/pkg/client/informers/externalversions"
Expand All @@ -41,16 +40,12 @@ var once sync.Once

// type global
type Cache struct {
mu sync.RWMutex
initialized bool
pods map[string]*v1.Pod

mu sync.RWMutex
initialized bool
pods map[string]*v1.Pod
podToModelMapping map[string]map[string]struct{} // pod_name: map[model_name]struct{}
modelToPodMapping map[string]map[string]*v1.Pod // model_name: map[pod_name]*v1.Pod

modelAdapterToPodMapping map[string][]string // TODO deprecate: model_adapter_name: []pods
podToModelAdapterMapping map[string]map[string]struct{} // TODO deprecate: pod_name: map[model_adapter_name]struct{}
podRequestTracker map[string]int
podRequestTracker map[string]int
}

var (
Expand Down Expand Up @@ -100,15 +95,11 @@ func NewCache(config *rest.Config, stopCh <-chan struct{}) *Cache {
}

instance = Cache{
initialized: true,
pods: map[string]*v1.Pod{},

initialized: true,
pods: map[string]*v1.Pod{},
podToModelMapping: map[string]map[string]struct{}{},
modelToPodMapping: map[string]map[string]*v1.Pod{},

modelAdapterToPodMapping: map[string][]string{},
podToModelAdapterMapping: map[string]map[string]struct{}{},
podRequestTracker: map[string]int{},
podRequestTracker: map[string]int{},
}

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -144,7 +135,6 @@ func (c *Cache) addPod(obj interface{}) {

c.pods[pod.Name] = pod
c.addPodAndModelMapping(pod.Name, modelName)
c.podToModelAdapterMapping[pod.Name] = map[string]struct{}{}
klog.Infof("POD CREATED: %s/%s", pod.Namespace, pod.Name)
c.debugInfo()
}
Expand Down Expand Up @@ -196,9 +186,6 @@ func (c *Cache) addModelAdapter(obj interface{}) {
c.addPodAndModelMapping(pod, model.Name)
}

c.modelAdapterToPodMapping[model.Name] = model.Status.Instances
c.addModelAdapterMapping(model)

klog.Infof("MODELADAPTER CREATED: %s/%s", model.Namespace, model.Name)
c.debugInfo()
}
Expand All @@ -218,10 +205,6 @@ func (c *Cache) updateModelAdapter(oldObj interface{}, newObj interface{}) {
c.addPodAndModelMapping(pod, newModel.Name)
}

c.modelAdapterToPodMapping[newModel.Name] = newModel.Status.Instances
c.deleteModelAdapterMapping(oldModel)
c.addModelAdapterMapping(newModel)

klog.Infof("MODELADAPTER UPDATED. %s/%s %s", oldModel.Namespace, oldModel.Name, newModel.Status.Phase)
c.debugInfo()
}
Expand All @@ -235,9 +218,6 @@ func (c *Cache) deleteModelAdapter(obj interface{}) {
c.deletePodAndModelMapping(pod, model.Name)
}

delete(c.modelAdapterToPodMapping, model.Name)
c.deleteModelAdapterMapping(model)

klog.Infof("MODELADAPTER DELETED: %s/%s", model.Namespace, model.Name)
c.debugInfo()
}
Expand Down Expand Up @@ -275,29 +255,6 @@ func (c *Cache) deletePodAndModelMapping(podName, modelName string) {
delete(c.modelToPodMapping, modelName)
}

func (c *Cache) addModelAdapterMapping(model *modelv1alpha1.ModelAdapter) {
for _, pod := range model.Status.Instances {
models, ok := c.podToModelAdapterMapping[pod]
if !ok {
c.podToModelAdapterMapping[pod] = map[string]struct{}{
model.Name: {},
}
continue
}

models[model.Name] = struct{}{}
c.podToModelAdapterMapping[pod] = models
}
}

func (c *Cache) deleteModelAdapterMapping(model *modelv1alpha1.ModelAdapter) {
for _, pod := range model.Status.Instances {
modelAdapters := c.podToModelAdapterMapping[pod]
delete(modelAdapters, model.Name)
c.podToModelAdapterMapping[pod] = modelAdapters
}
}

func (c *Cache) debugInfo() {
for _, pod := range c.pods {
klog.Info(pod.Name)
Expand All @@ -316,23 +273,18 @@ func (c *Cache) debugInfo() {
}
klog.Infof("model: %s, pods: %s", modelName, podList)
}
}

for model, instances := range c.modelAdapterToPodMapping {
klog.Infof("modelName: %s, instances: %v", model, instances)
}

for pod, models := range c.podToModelAdapterMapping {
if !strings.HasPrefix(pod, "llama") {
continue
}

modelsArr := []string{}
for m := range models {
modelsArr = append(modelsArr, m)
}
func (c *Cache) GetPod(podName string) (*v1.Pod, error) {
c.mu.RLock()
defer c.mu.RUnlock()

klog.Infof("podName: %s, modelAdapters: %v", pod, modelsArr)
pod, ok := c.pods[podName]
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
}

return pod, nil
}

func (c *Cache) GetPods() map[string]*v1.Pod {
Expand All @@ -342,24 +294,28 @@ func (c *Cache) GetPods() map[string]*v1.Pod {
return c.pods
}

func (c *Cache) GetPodsForModel(modelName string) map[string]*v1.Pod {
func (c *Cache) GetPodsForModel(modelName string) (map[string]*v1.Pod, error) {
c.mu.RLock()
defer c.mu.RUnlock()

pods := map[string]*v1.Pod{}
for podName, pod := range c.modelToPodMapping[modelName] {
klog.Info(podName)
pods[podName] = pod
podsMap, ok := c.modelToPodMapping[modelName]
if !ok {
return nil, fmt.Errorf("model does not exist in the cache: %s", modelName)
}

return pods
return podsMap, nil
}

func (c *Cache) GetPodToModelAdapterMapping() map[string]map[string]struct{} {
func (c *Cache) GetModelsForPod(podName string) (map[string]struct{}, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.podToModelAdapterMapping
models, ok := c.podToModelMapping[podName]
if !ok {
return nil, fmt.Errorf("pod does not exist in the cache: %s", podName)
}

return models, nil
}

func (c *Cache) IncrPodRequestCount(podName string) int {
Expand Down
17 changes: 6 additions & 11 deletions pkg/controller/modeladapter/scheduling/leastadapters.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package scheduling

import (
"context"
"errors"
"math"

"github.com/aibrix/aibrix/pkg/cache"
Expand All @@ -37,24 +36,20 @@ func NewLeastAdapters(c *cache.Cache) Scheduler {
}

func (r leastAdapters) SelectPod(ctx context.Context, pods []v1.Pod) (*v1.Pod, error) {
modelAdapterCountMin := math.MaxInt
selectedPod := v1.Pod{}
podMap := r.cache.GetPods()
podToModelAdapterMapping := r.cache.GetPodToModelAdapterMapping()
modelAdapterCountMin := math.MaxInt

for _, pod := range pods {
if _, ok := podMap[pod.Name]; !ok {
return nil, errors.New("pod not found in the cache")
models, err := r.cache.GetModelsForPod(pod.Name)
if err != nil {
return nil, err
}

modelAdapters := podToModelAdapterMapping[pod.Name]
if len(modelAdapters) < modelAdapterCountMin {
if len(models) < modelAdapterCountMin {
selectedPod = pod
modelAdapterCountMin = len(modelAdapters)
modelAdapterCountMin = len(models)
}
}

klog.Infof("pod selected with least model adapters: %s", selectedPod.Name)

return &selectedPod, nil
}
Loading

0 comments on commit 77d5299

Please sign in to comment.