Skip to content

Commit

Permalink
Refactor metric source for customized protocol, port and path (#511)
Browse files Browse the repository at this point in the history
* refactor pa_types.go and modify metric client, fetcher

* fix test bug

* conduct ./hack/update-codegen.sh

* update config/crd/autoscaling, fix function name, update pa.yaml
  • Loading branch information
kr11 authored Dec 10, 2024
1 parent 83d2f03 commit a5e9849
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 130 deletions.
46 changes: 38 additions & 8 deletions api/autoscaling/v1alpha1/podautoscaler_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
"fmt"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -61,10 +63,6 @@ type PodAutoscalerSpec struct {
// It cannot be less than minReplicas
MaxReplicas int32 `json:"maxReplicas"`

TargetMetric string `json:"targetMetric"`

TargetValue string `json:"targetValue"`

// MetricsSources defines a list of sources from which metrics are collected to make scaling decisions.
MetricsSources []MetricSource `json:"metricsSources,omitempty"`

Expand All @@ -86,14 +84,38 @@ const (
APA ScalingStrategyType = "APA"
)

type MetricSourceType string

const (
// POD need to scan all k8s pods to fetch the data
POD MetricSourceType = "pod"
// DOMAIN only need to access specified domain
DOMAIN MetricSourceType = "domain"
)

type ProtocolType string

const (
HTTP ProtocolType = "http"
HTTPS ProtocolType = "https"
)

// MetricSource defines an endpoint and path from which metrics are collected.
type MetricSource struct {
// e.g. service1.example.com
Endpoint string `json:"endpoint"`
// access an endpoint or scan a list of k8s pod
MetricSourceType MetricSourceType `json:"metricSourceType"`
// http or https
ProtocolType ProtocolType `json:"protocolType"`
// e.g. service1.example.com. meaningless for MetricSourceType.POD
Endpoint string `json:"endpoint,omitempty"`
// e.g. /api/metrics/cpu
Path string `json:"path"`
// e.g. kv_cache_utilization metrics
Name string `json:"metric"`
// e.g. 8080. meaningless for MetricSourceType.DOMAIN
Port string `json:"port,omitempty"`
// TargetMetric identifies the specific metric to monitor (e.g., kv_cache_utilization).
TargetMetric string `json:"targetMetric"`
// TargetValue sets the desired threshold for the metric (e.g., 50 for 50% utilization).
TargetValue string `json:"targetValue"`
}

// PodAutoscalerStatus defines the observed state of PodAutoscaler
Expand Down Expand Up @@ -142,3 +164,11 @@ const (
// QPS is the requests per second reaching the Pod.
QPS = "qps"
)

// GetPaMetricSources Currently, we don't support metric resources that are more than one yet.
func GetPaMetricSources(pa PodAutoscaler) (MetricSource, error) {
if len(pa.Spec.MetricsSources) != 1 {
return MetricSource{}, fmt.Errorf("for now we only support one MetricsSource")
}
return pa.Spec.MetricsSources[0], nil
}
22 changes: 13 additions & 9 deletions config/crd/autoscaling/autoscaling.aibrix.ai_podautoscalers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,24 @@ spec:
properties:
endpoint:
type: string
metric:
metricSourceType:
type: string
path:
type: string
port:
type: string
protocolType:
type: string
targetMetric:
type: string
targetValue:
type: string
required:
- endpoint
- metric
- metricSourceType
- path
- protocolType
- targetMetric
- targetValue
type: object
type: array
minReplicas:
Expand All @@ -67,16 +77,10 @@ spec:
x-kubernetes-map-type: atomic
scalingStrategy:
type: string
targetMetric:
type: string
targetValue:
type: string
required:
- maxReplicas
- scaleTargetRef
- scalingStrategy
- targetMetric
- targetValue
type: object
status:
properties:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ metadata:
labels:
app.kubernetes.io/name: aibrix
app.kubernetes.io/managed-by: kustomize
kpa.autoscaling.aibrix.ai/scale-down-delay: "0"
kpa.autoscaling.aibrix.ai/scale-down-delay: 30s
namespace: default
spec:
scaleTargetRef:
Expand All @@ -15,10 +15,11 @@ spec:
name: mock-llama2-7b
minReplicas: 0
maxReplicas: 10
targetMetric: "avg_prompt_throughput_toks_per_s" # Ignore if metricsSources is configured
metricsSources:
- endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080
path: /metrics/default/simulator-llama2-7b
metric: "vllm:deployment_replicas"
targetValue: "1"
metricsSources:
- metricSourceType: pod
protocolType: http
port: "8000"
path: metrics
targetMetric: "avg_prompt_throughput_toks_per_s"
targetValue: "60"
scalingStrategy: "KPA"
54 changes: 47 additions & 7 deletions pkg/client/applyconfiguration/autoscaling/v1alpha1/metricsource.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions pkg/controller/podautoscaler/common/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,16 @@ func NewBaseScalingContext() *BaseScalingContext {

// UpdateByPaTypes should be invoked in any scaling context that embeds BaseScalingContext.
func (b *BaseScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error {
b.ScalingMetric = pa.Spec.TargetMetric
source, err := autoscalingv1alpha1.GetPaMetricSources(*pa)
if err != nil {
return err
}

b.ScalingMetric = source.TargetMetric
// parse target value
targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64)
targetValue, err := strconv.ParseFloat(source.TargetValue, 64)
if err != nil {
klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue)
klog.ErrorS(err, "Failed to parse target value", "targetValue", source.TargetValue)
return err
}
b.TargetValue = targetValue
Expand Down
13 changes: 9 additions & 4 deletions pkg/controller/podautoscaler/hpa_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,18 @@ func makeHPA(pa *pav1.PodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler {
if minReplicas != nil && *minReplicas > 0 {
hpa.Spec.MinReplicas = minReplicas
}
source, err := pav1.GetPaMetricSources(*pa)
if err != nil {
klog.ErrorS(err, "Failed to GetPaMetricSources")
return nil
}

if targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64); err != nil {
if targetValue, err := strconv.ParseFloat(source.TargetValue, 64); err != nil {
klog.ErrorS(err, "Failed to parse target value")
} else {
klog.V(4).InfoS("Creating HPA", "metric", pa.Spec.TargetMetric, "target", targetValue)
klog.V(4).InfoS("Creating HPA", "metric", source.TargetMetric, "target", targetValue)

switch strings.ToLower(pa.Spec.TargetMetric) {
switch strings.ToLower(source.TargetMetric) {
case pav1.CPU:
cpu := int32(math.Ceil(targetValue))
hpa.Spec.Metrics = []autoscalingv2.MetricSpec{{
Expand Down Expand Up @@ -102,7 +107,7 @@ func makeHPA(pa *pav1.PodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler {
Type: autoscalingv2.PodsMetricSourceType,
Pods: &autoscalingv2.PodsMetricSource{
Metric: autoscalingv2.MetricIdentifier{
Name: pa.Spec.TargetMetric,
Name: source.TargetMetric,
},
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.AverageValueMetricType,
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/podautoscaler/metrics/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,17 @@ func (c *KPAMetricsClient) StableAndPanicMetrics(
return stableValue, panicValue, nil
}

func (c *KPAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) {
return GetPodContainerMetric(ctx, c.fetcher, pod, metricName, metricPort)
func (c *KPAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) {
return GetPodContainerMetric(ctx, c.fetcher, pod, source)
}

func (c *KPAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, metricName string, metricPort int) ([]float64, error) {
return GetMetricsFromPods(ctx, c.fetcher, pods, metricName, metricPort)
func (c *KPAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) {
return GetMetricsFromPods(ctx, c.fetcher, pods, source)
}

func (c *KPAMetricsClient) GetMetricFromSource(ctx context.Context, source autoscalingv1alpha1.MetricSource) (float64, error) {
// Retrieve metrics from a list of pods
return c.fetcher.FetchMetric(ctx, source.Endpoint, source.Path, source.Name)
return c.fetcher.FetchMetric(ctx, "", source.Endpoint, source.Path, source.TargetMetric)
}

type APAMetricsClient struct {
Expand Down Expand Up @@ -213,14 +213,14 @@ func (c *APAMetricsClient) GetMetricValue(
return metricValue, nil
}

func (c *APAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) {
return GetPodContainerMetric(ctx, c.fetcher, pod, metricName, metricPort)
func (c *APAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) {
return GetPodContainerMetric(ctx, c.fetcher, pod, source)
}

func (c *APAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, metricName string, metricPort int) ([]float64, error) {
return GetMetricsFromPods(ctx, c.fetcher, pods, metricName, metricPort)
func (c *APAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) {
return GetMetricsFromPods(ctx, c.fetcher, pods, source)
}

func (c *APAMetricsClient) GetMetricFromSource(ctx context.Context, source autoscalingv1alpha1.MetricSource) (float64, error) {
return c.fetcher.FetchMetric(ctx, source.Endpoint, source.Path, source.Name)
return c.fetcher.FetchMetric(ctx, "", source.Endpoint, source.Path, source.TargetMetric)
}
16 changes: 10 additions & 6 deletions pkg/controller/podautoscaler/metrics/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"net/http"
"strings"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -45,9 +47,9 @@ const (
// MetricFetcher defines an interface for fetching metrics. it could be Kubernetes metrics or Pod prometheus metrics.
type MetricFetcher interface {
// Obseleted: Call FetchMetric instead.
FetchPodMetrics(ctx context.Context, pod v1.Pod, metricsPort int, metricName string) (float64, error)
FetchPodMetrics(ctx context.Context, pod v1.Pod, source autoscalingv1alpha1.MetricSource) (float64, error)

FetchMetric(ctx context.Context, endpoint string, path string, metricName string) (float64, error)
FetchMetric(ctx context.Context, protocol autoscalingv1alpha1.ProtocolType, endpoint, path, metricName string) (float64, error)
}

type abstractMetricsFetcher struct{}
Expand All @@ -59,14 +61,16 @@ func (f *abstractMetricsFetcher) FetchMetric(ctx context.Context, pod v1.Pod, me
// RestMetricsFetcher implements MetricFetcher to fetch metrics from Pod's /metrics endpoint.
type RestMetricsFetcher struct{}

func (f *RestMetricsFetcher) FetchPodMetrics(ctx context.Context, pod v1.Pod, metricsPort int, metricName string) (float64, error) {
var _ MetricFetcher = (*RestMetricsFetcher)(nil)

func (f *RestMetricsFetcher) FetchPodMetrics(ctx context.Context, pod v1.Pod, source autoscalingv1alpha1.MetricSource) (float64, error) {
// Use /metrics to fetch pod's endpoint
return f.FetchMetric(ctx, fmt.Sprintf("%s:%d", pod.Status.PodIP, metricsPort), "/metrics", metricName)
return f.FetchMetric(ctx, source.ProtocolType, fmt.Sprintf("%s:%s", pod.Status.PodIP, source.Port), source.Path, source.TargetMetric)
}

func (f *RestMetricsFetcher) FetchMetric(ctx context.Context, endpoint string, path string, metricName string) (float64, error) {
func (f *RestMetricsFetcher) FetchMetric(ctx context.Context, protocol autoscalingv1alpha1.ProtocolType, endpoint, path, metricName string) (float64, error) {
// Use http to fetch endpoint
url := fmt.Sprintf("http://%s/%s", endpoint, strings.TrimLeft(path, "/"))
url := fmt.Sprintf("%s://%s/%s", protocol, endpoint, strings.TrimLeft(path, "/"))

// Create request with context, so that the request will be canceled if the context is canceled
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
Expand Down
Loading

0 comments on commit a5e9849

Please sign in to comment.