diff --git a/api/autoscaling/v1alpha1/podautoscaler_types.go b/api/autoscaling/v1alpha1/podautoscaler_types.go index bd1bc623..8c8a9452 100644 --- a/api/autoscaling/v1alpha1/podautoscaler_types.go +++ b/api/autoscaling/v1alpha1/podautoscaler_types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "fmt" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -61,10 +63,6 @@ type PodAutoscalerSpec struct { // It cannot be less than minReplicas MaxReplicas int32 `json:"maxReplicas"` - TargetMetric string `json:"targetMetric"` - - TargetValue string `json:"targetValue"` - // MetricsSources defines a list of sources from which metrics are collected to make scaling decisions. MetricsSources []MetricSource `json:"metricsSources,omitempty"` @@ -86,14 +84,38 @@ const ( APA ScalingStrategyType = "APA" ) +type MetricSourceType string + +const ( + // POD need to scan all k8s pods to fetch the data + POD MetricSourceType = "pod" + // DOMAIN only need to access specified domain + DOMAIN MetricSourceType = "domain" +) + +type ProtocolType string + +const ( + HTTP ProtocolType = "http" + HTTPS ProtocolType = "https" +) + // MetricSource defines an endpoint and path from which metrics are collected. type MetricSource struct { - // e.g. service1.example.com - Endpoint string `json:"endpoint"` + // access an endpoint or scan a list of k8s pod + MetricSourceType MetricSourceType `json:"metricSourceType"` + // http or https + ProtocolType ProtocolType `json:"protocolType"` + // e.g. service1.example.com. meaningless for MetricSourceType.POD + Endpoint string `json:"endpoint,omitempty"` // e.g. /api/metrics/cpu Path string `json:"path"` - // e.g. kv_cache_utilization metrics - Name string `json:"metric"` + // e.g. 8080. meaningless for MetricSourceType.DOMAIN + Port string `json:"port,omitempty"` + // TargetMetric identifies the specific metric to monitor (e.g., kv_cache_utilization). + TargetMetric string `json:"targetMetric"` + // TargetValue sets the desired threshold for the metric (e.g., 50 for 50% utilization). + TargetValue string `json:"targetValue"` } // PodAutoscalerStatus defines the observed state of PodAutoscaler @@ -142,3 +164,11 @@ const ( // QPS is the requests per second reaching the Pod. QPS = "qps" ) + +// GetPaMetricSources Currently, we don't support metric resources that are more than one yet. +func GetPaMetricSources(pa PodAutoscaler) (MetricSource, error) { + if len(pa.Spec.MetricsSources) != 1 { + return MetricSource{}, fmt.Errorf("for now we only support one MetricsSource") + } + return pa.Spec.MetricsSources[0], nil +} diff --git a/config/crd/autoscaling/autoscaling.aibrix.ai_podautoscalers.yaml b/config/crd/autoscaling/autoscaling.aibrix.ai_podautoscalers.yaml index 87b62054..a1fec170 100644 --- a/config/crd/autoscaling/autoscaling.aibrix.ai_podautoscalers.yaml +++ b/config/crd/autoscaling/autoscaling.aibrix.ai_podautoscalers.yaml @@ -34,14 +34,24 @@ spec: properties: endpoint: type: string - metric: + metricSourceType: type: string path: type: string + port: + type: string + protocolType: + type: string + targetMetric: + type: string + targetValue: + type: string required: - - endpoint - - metric + - metricSourceType - path + - protocolType + - targetMetric + - targetValue type: object type: array minReplicas: @@ -67,16 +77,10 @@ spec: x-kubernetes-map-type: atomic scalingStrategy: type: string - targetMetric: - type: string - targetValue: - type: string required: - maxReplicas - scaleTargetRef - scalingStrategy - - targetMetric - - targetValue type: object status: properties: diff --git a/development/app/config/templates/podautoscaler/podautoscaler.yaml b/development/app/config/templates/podautoscaler/podautoscaler_kpa.yaml similarity index 54% rename from development/app/config/templates/podautoscaler/podautoscaler.yaml rename to development/app/config/templates/podautoscaler/podautoscaler_kpa.yaml index 042523fe..e55da8cd 100644 --- a/development/app/config/templates/podautoscaler/podautoscaler.yaml +++ b/development/app/config/templates/podautoscaler/podautoscaler_kpa.yaml @@ -6,7 +6,7 @@ metadata: labels: app.kubernetes.io/name: aibrix app.kubernetes.io/managed-by: kustomize - kpa.autoscaling.aibrix.ai/scale-down-delay: "0" + kpa.autoscaling.aibrix.ai/scale-down-delay: 30s namespace: default spec: scaleTargetRef: @@ -15,10 +15,11 @@ spec: name: mock-llama2-7b minReplicas: 0 maxReplicas: 10 - targetMetric: "avg_prompt_throughput_toks_per_s" # Ignore if metricsSources is configured - metricsSources: - - endpoint: aibrix-gpu-optimizer.aibrix-system.svc.cluster.local:8080 - path: /metrics/default/simulator-llama2-7b - metric: "vllm:deployment_replicas" - targetValue: "1" + metricsSources: + - metricSourceType: pod + protocolType: http + port: "8000" + path: metrics + targetMetric: "avg_prompt_throughput_toks_per_s" + targetValue: "60" scalingStrategy: "KPA" \ No newline at end of file diff --git a/pkg/client/applyconfiguration/autoscaling/v1alpha1/metricsource.go b/pkg/client/applyconfiguration/autoscaling/v1alpha1/metricsource.go index d35b6673..0252c367 100644 --- a/pkg/client/applyconfiguration/autoscaling/v1alpha1/metricsource.go +++ b/pkg/client/applyconfiguration/autoscaling/v1alpha1/metricsource.go @@ -17,12 +17,20 @@ limitations under the License. package v1alpha1 +import ( + v1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" +) + // MetricSourceApplyConfiguration represents an declarative configuration of the MetricSource type for use // with apply. type MetricSourceApplyConfiguration struct { - Endpoint *string `json:"endpoint,omitempty"` - Path *string `json:"path,omitempty"` - Name *string `json:"metric,omitempty"` + MetricSourceType *v1alpha1.MetricSourceType `json:"metricSourceType,omitempty"` + ProtocolType *v1alpha1.ProtocolType `json:"protocolType,omitempty"` + Endpoint *string `json:"endpoint,omitempty"` + Path *string `json:"path,omitempty"` + Port *string `json:"port,omitempty"` + TargetMetric *string `json:"targetMetric,omitempty"` + TargetValue *string `json:"targetValue,omitempty"` } // MetricSourceApplyConfiguration constructs an declarative configuration of the MetricSource type for use with @@ -31,6 +39,22 @@ func MetricSource() *MetricSourceApplyConfiguration { return &MetricSourceApplyConfiguration{} } +// WithMetricSourceType sets the MetricSourceType field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the MetricSourceType field is set to the value of the last call. +func (b *MetricSourceApplyConfiguration) WithMetricSourceType(value v1alpha1.MetricSourceType) *MetricSourceApplyConfiguration { + b.MetricSourceType = &value + return b +} + +// WithProtocolType sets the ProtocolType field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ProtocolType field is set to the value of the last call. +func (b *MetricSourceApplyConfiguration) WithProtocolType(value v1alpha1.ProtocolType) *MetricSourceApplyConfiguration { + b.ProtocolType = &value + return b +} + // WithEndpoint sets the Endpoint field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the Endpoint field is set to the value of the last call. @@ -47,10 +71,26 @@ func (b *MetricSourceApplyConfiguration) WithPath(value string) *MetricSourceApp return b } -// WithName sets the Name field in the declarative configuration to the given value +// WithPort sets the Port field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Port field is set to the value of the last call. +func (b *MetricSourceApplyConfiguration) WithPort(value string) *MetricSourceApplyConfiguration { + b.Port = &value + return b +} + +// WithTargetMetric sets the TargetMetric field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the TargetMetric field is set to the value of the last call. +func (b *MetricSourceApplyConfiguration) WithTargetMetric(value string) *MetricSourceApplyConfiguration { + b.TargetMetric = &value + return b +} + +// WithTargetValue sets the TargetValue field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the Name field is set to the value of the last call. -func (b *MetricSourceApplyConfiguration) WithName(value string) *MetricSourceApplyConfiguration { - b.Name = &value +// If called multiple times, the TargetValue field is set to the value of the last call. +func (b *MetricSourceApplyConfiguration) WithTargetValue(value string) *MetricSourceApplyConfiguration { + b.TargetValue = &value return b } diff --git a/pkg/client/applyconfiguration/autoscaling/v1alpha1/podautoscalerspec.go b/pkg/client/applyconfiguration/autoscaling/v1alpha1/podautoscalerspec.go index 38bfc6a5..1d0c362e 100644 --- a/pkg/client/applyconfiguration/autoscaling/v1alpha1/podautoscalerspec.go +++ b/pkg/client/applyconfiguration/autoscaling/v1alpha1/podautoscalerspec.go @@ -28,8 +28,6 @@ type PodAutoscalerSpecApplyConfiguration struct { ScaleTargetRef *v1.ObjectReference `json:"scaleTargetRef,omitempty"` MinReplicas *int32 `json:"minReplicas,omitempty"` MaxReplicas *int32 `json:"maxReplicas,omitempty"` - TargetMetric *string `json:"targetMetric,omitempty"` - TargetValue *string `json:"targetValue,omitempty"` MetricsSources []MetricSourceApplyConfiguration `json:"metricsSources,omitempty"` ScalingStrategy *autoscalingv1alpha1.ScalingStrategyType `json:"scalingStrategy,omitempty"` } @@ -64,22 +62,6 @@ func (b *PodAutoscalerSpecApplyConfiguration) WithMaxReplicas(value int32) *PodA return b } -// WithTargetMetric sets the TargetMetric field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the TargetMetric field is set to the value of the last call. -func (b *PodAutoscalerSpecApplyConfiguration) WithTargetMetric(value string) *PodAutoscalerSpecApplyConfiguration { - b.TargetMetric = &value - return b -} - -// WithTargetValue sets the TargetValue field in the declarative configuration to the given value -// and returns the receiver, so that objects can be built by chaining "With" function invocations. -// If called multiple times, the TargetValue field is set to the value of the last call. -func (b *PodAutoscalerSpecApplyConfiguration) WithTargetValue(value string) *PodAutoscalerSpecApplyConfiguration { - b.TargetValue = &value - return b -} - // WithMetricsSources adds the given value to the MetricsSources field in the declarative configuration // and returns the receiver, so that objects can be build by chaining "With" function invocations. // If called multiple times, values provided by each call will be appended to the MetricsSources field. diff --git a/pkg/controller/podautoscaler/common/context.go b/pkg/controller/podautoscaler/common/context.go index 51f52b58..518dde62 100644 --- a/pkg/controller/podautoscaler/common/context.go +++ b/pkg/controller/podautoscaler/common/context.go @@ -71,11 +71,16 @@ func NewBaseScalingContext() *BaseScalingContext { // UpdateByPaTypes should be invoked in any scaling context that embeds BaseScalingContext. func (b *BaseScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error { - b.ScalingMetric = pa.Spec.TargetMetric + source, err := autoscalingv1alpha1.GetPaMetricSources(*pa) + if err != nil { + return err + } + + b.ScalingMetric = source.TargetMetric // parse target value - targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64) + targetValue, err := strconv.ParseFloat(source.TargetValue, 64) if err != nil { - klog.ErrorS(err, "Failed to parse target value", "targetValue", pa.Spec.TargetValue) + klog.ErrorS(err, "Failed to parse target value", "targetValue", source.TargetValue) return err } b.TargetValue = targetValue diff --git a/pkg/controller/podautoscaler/hpa_resources.go b/pkg/controller/podautoscaler/hpa_resources.go index b072a98b..3c48750a 100644 --- a/pkg/controller/podautoscaler/hpa_resources.go +++ b/pkg/controller/podautoscaler/hpa_resources.go @@ -63,13 +63,18 @@ func makeHPA(pa *pav1.PodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler { if minReplicas != nil && *minReplicas > 0 { hpa.Spec.MinReplicas = minReplicas } + source, err := pav1.GetPaMetricSources(*pa) + if err != nil { + klog.ErrorS(err, "Failed to GetPaMetricSources") + return nil + } - if targetValue, err := strconv.ParseFloat(pa.Spec.TargetValue, 64); err != nil { + if targetValue, err := strconv.ParseFloat(source.TargetValue, 64); err != nil { klog.ErrorS(err, "Failed to parse target value") } else { - klog.V(4).InfoS("Creating HPA", "metric", pa.Spec.TargetMetric, "target", targetValue) + klog.V(4).InfoS("Creating HPA", "metric", source.TargetMetric, "target", targetValue) - switch strings.ToLower(pa.Spec.TargetMetric) { + switch strings.ToLower(source.TargetMetric) { case pav1.CPU: cpu := int32(math.Ceil(targetValue)) hpa.Spec.Metrics = []autoscalingv2.MetricSpec{{ @@ -102,7 +107,7 @@ func makeHPA(pa *pav1.PodAutoscaler) *autoscalingv2.HorizontalPodAutoscaler { Type: autoscalingv2.PodsMetricSourceType, Pods: &autoscalingv2.PodsMetricSource{ Metric: autoscalingv2.MetricIdentifier{ - Name: pa.Spec.TargetMetric, + Name: source.TargetMetric, }, Target: autoscalingv2.MetricTarget{ Type: autoscalingv2.AverageValueMetricType, diff --git a/pkg/controller/podautoscaler/metrics/client.go b/pkg/controller/podautoscaler/metrics/client.go index 6e198710..054ad1a1 100644 --- a/pkg/controller/podautoscaler/metrics/client.go +++ b/pkg/controller/podautoscaler/metrics/client.go @@ -127,17 +127,17 @@ func (c *KPAMetricsClient) StableAndPanicMetrics( return stableValue, panicValue, nil } -func (c *KPAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) { - return GetPodContainerMetric(ctx, c.fetcher, pod, metricName, metricPort) +func (c *KPAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) { + return GetPodContainerMetric(ctx, c.fetcher, pod, source) } -func (c *KPAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, metricName string, metricPort int) ([]float64, error) { - return GetMetricsFromPods(ctx, c.fetcher, pods, metricName, metricPort) +func (c *KPAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) { + return GetMetricsFromPods(ctx, c.fetcher, pods, source) } func (c *KPAMetricsClient) GetMetricFromSource(ctx context.Context, source autoscalingv1alpha1.MetricSource) (float64, error) { // Retrieve metrics from a list of pods - return c.fetcher.FetchMetric(ctx, source.Endpoint, source.Path, source.Name) + return c.fetcher.FetchMetric(ctx, "", source.Endpoint, source.Path, source.TargetMetric) } type APAMetricsClient struct { @@ -213,14 +213,14 @@ func (c *APAMetricsClient) GetMetricValue( return metricValue, nil } -func (c *APAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) { - return GetPodContainerMetric(ctx, c.fetcher, pod, metricName, metricPort) +func (c *APAMetricsClient) GetPodContainerMetric(ctx context.Context, pod corev1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) { + return GetPodContainerMetric(ctx, c.fetcher, pod, source) } -func (c *APAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, metricName string, metricPort int) ([]float64, error) { - return GetMetricsFromPods(ctx, c.fetcher, pods, metricName, metricPort) +func (c *APAMetricsClient) GetMetricsFromPods(ctx context.Context, pods []corev1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) { + return GetMetricsFromPods(ctx, c.fetcher, pods, source) } func (c *APAMetricsClient) GetMetricFromSource(ctx context.Context, source autoscalingv1alpha1.MetricSource) (float64, error) { - return c.fetcher.FetchMetric(ctx, source.Endpoint, source.Path, source.Name) + return c.fetcher.FetchMetric(ctx, "", source.Endpoint, source.Path, source.TargetMetric) } diff --git a/pkg/controller/podautoscaler/metrics/fetcher.go b/pkg/controller/podautoscaler/metrics/fetcher.go index 79b8fecc..957699d1 100644 --- a/pkg/controller/podautoscaler/metrics/fetcher.go +++ b/pkg/controller/podautoscaler/metrics/fetcher.go @@ -23,6 +23,8 @@ import ( "net/http" "strings" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -45,9 +47,9 @@ const ( // MetricFetcher defines an interface for fetching metrics. it could be Kubernetes metrics or Pod prometheus metrics. type MetricFetcher interface { // Obseleted: Call FetchMetric instead. - FetchPodMetrics(ctx context.Context, pod v1.Pod, metricsPort int, metricName string) (float64, error) + FetchPodMetrics(ctx context.Context, pod v1.Pod, source autoscalingv1alpha1.MetricSource) (float64, error) - FetchMetric(ctx context.Context, endpoint string, path string, metricName string) (float64, error) + FetchMetric(ctx context.Context, protocol autoscalingv1alpha1.ProtocolType, endpoint, path, metricName string) (float64, error) } type abstractMetricsFetcher struct{} @@ -59,14 +61,16 @@ func (f *abstractMetricsFetcher) FetchMetric(ctx context.Context, pod v1.Pod, me // RestMetricsFetcher implements MetricFetcher to fetch metrics from Pod's /metrics endpoint. type RestMetricsFetcher struct{} -func (f *RestMetricsFetcher) FetchPodMetrics(ctx context.Context, pod v1.Pod, metricsPort int, metricName string) (float64, error) { +var _ MetricFetcher = (*RestMetricsFetcher)(nil) + +func (f *RestMetricsFetcher) FetchPodMetrics(ctx context.Context, pod v1.Pod, source autoscalingv1alpha1.MetricSource) (float64, error) { // Use /metrics to fetch pod's endpoint - return f.FetchMetric(ctx, fmt.Sprintf("%s:%d", pod.Status.PodIP, metricsPort), "/metrics", metricName) + return f.FetchMetric(ctx, source.ProtocolType, fmt.Sprintf("%s:%s", pod.Status.PodIP, source.Port), source.Path, source.TargetMetric) } -func (f *RestMetricsFetcher) FetchMetric(ctx context.Context, endpoint string, path string, metricName string) (float64, error) { +func (f *RestMetricsFetcher) FetchMetric(ctx context.Context, protocol autoscalingv1alpha1.ProtocolType, endpoint, path, metricName string) (float64, error) { // Use http to fetch endpoint - url := fmt.Sprintf("http://%s/%s", endpoint, strings.TrimLeft(path, "/")) + url := fmt.Sprintf("%s://%s/%s", protocol, endpoint, strings.TrimLeft(path, "/")) // Create request with context, so that the request will be canceled if the context is canceled req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) diff --git a/pkg/controller/podautoscaler/metrics/interface.go b/pkg/controller/podautoscaler/metrics/interface.go index 09dfcd5e..18e22976 100644 --- a/pkg/controller/podautoscaler/metrics/interface.go +++ b/pkg/controller/podautoscaler/metrics/interface.go @@ -18,6 +18,7 @@ package metrics import ( "context" + "fmt" "time" "k8s.io/apimachinery/pkg/types" @@ -33,19 +34,21 @@ type NamespaceNameMetric struct { MetricName string } -func NewNamespaceNameMetric(pa *autoscalingv1alpha1.PodAutoscaler) NamespaceNameMetric { - metricName := pa.Spec.TargetMetric - if len(pa.Spec.MetricsSources) > 0 { - metricName = pa.Spec.MetricsSources[0].Name +// NewNamespaceNameMetric creates a NamespaceNameMetric based on the PodAutoscaler's metrics source. +// For consistency, it will return the corresponding MetricSource. +// Currently, it supports only a single metric source. In the future, this could be extended to handle multiple metric sources. +func NewNamespaceNameMetric(pa *autoscalingv1alpha1.PodAutoscaler) (NamespaceNameMetric, autoscalingv1alpha1.MetricSource, error) { + if len(pa.Spec.MetricsSources) != 1 { + return NamespaceNameMetric{}, autoscalingv1alpha1.MetricSource{}, fmt.Errorf("metrics sources must be 1, but got %d", len(pa.Spec.MetricsSources)) } - + metricSource := pa.Spec.MetricsSources[0] return NamespaceNameMetric{ NamespacedName: types.NamespacedName{ Namespace: pa.Namespace, Name: pa.Spec.ScaleTargetRef.Name, }, - MetricName: metricName, - } + MetricName: metricSource.TargetMetric, + }, metricSource, nil } // PodMetric contains pod metric value (the metric values are expected to be the metric as a milli-value) @@ -69,9 +72,9 @@ type MetricClient interface { // for the specified named container in specific pods in the given namespace and when // the container is an empty string it returns the sum of all the container metrics. // TODO: should we use `metricKey` all the time? - GetPodContainerMetric(ctx context.Context, pod v1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) + GetPodContainerMetric(ctx context.Context, pod v1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) - GetMetricsFromPods(ctx context.Context, pods []v1.Pod, metricName string, metricPort int) ([]float64, error) + GetMetricsFromPods(ctx context.Context, pods []v1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) GetMetricFromSource(ctx context.Context, source autoscalingv1alpha1.MetricSource) (float64, error) diff --git a/pkg/controller/podautoscaler/metrics/utils.go b/pkg/controller/podautoscaler/metrics/utils.go index 3cee4563..812f4810 100644 --- a/pkg/controller/podautoscaler/metrics/utils.go +++ b/pkg/controller/podautoscaler/metrics/utils.go @@ -24,6 +24,8 @@ import ( "strings" "time" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + corev1 "k8s.io/api/core/v1" ) @@ -95,8 +97,8 @@ func GetMetricUsageRatio(metrics PodMetricsInfo, targetUsage int64) (usageRatio return float64(currentUsage) / float64(targetUsage), currentUsage } -func GetPodContainerMetric(ctx context.Context, fetcher MetricFetcher, pod corev1.Pod, metricName string, metricPort int) (PodMetricsInfo, time.Time, error) { - _, err := fetcher.FetchPodMetrics(ctx, pod, metricPort, metricName) +func GetPodContainerMetric(ctx context.Context, fetcher MetricFetcher, pod corev1.Pod, source autoscalingv1alpha1.MetricSource) (PodMetricsInfo, time.Time, error) { + _, err := fetcher.FetchPodMetrics(ctx, pod, source) currentTimestamp := time.Now() if err != nil { return nil, currentTimestamp, err @@ -106,11 +108,11 @@ func GetPodContainerMetric(ctx context.Context, fetcher MetricFetcher, pod corev return nil, currentTimestamp, nil } -func GetMetricsFromPods(ctx context.Context, fetcher MetricFetcher, pods []corev1.Pod, metricName string, metricPort int) ([]float64, error) { +func GetMetricsFromPods(ctx context.Context, fetcher MetricFetcher, pods []corev1.Pod, source autoscalingv1alpha1.MetricSource) ([]float64, error) { metrics := make([]float64, 0, len(pods)) for _, pod := range pods { // TODO: Let's optimize the performance for multi-metrics later. - metric, err := fetcher.FetchPodMetrics(ctx, pod, metricPort, metricName) + metric, err := fetcher.FetchPodMetrics(ctx, pod, source) if err != nil { return nil, err } diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 4a668011..2d16f7aa 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -284,7 +284,11 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto paStatusOriginal := pa.Status.DeepCopy() paType := pa.Spec.ScalingStrategy scaleReference := fmt.Sprintf("%s/%s/%s", pa.Spec.ScaleTargetRef.Kind, pa.Namespace, pa.Spec.ScaleTargetRef.Name) - metricKey := metrics.NewNamespaceNameMetric(&pa) + metricKey, metricSource, err := metrics.NewNamespaceNameMetric(&pa) + if err != nil { + r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedGetMetricKey", err.Error()) + return ctrl.Result{}, err + } targetGV, err := schema.ParseGroupVersion(pa.Spec.ScaleTargetRef.APIVersion) if err != nil { @@ -325,7 +329,7 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto setCondition(&pa, "AbleToScale", metav1.ConditionTrue, "SucceededGetScale", "the %s controller was able to get the target's current scale", paType) // Update the scale required metrics periodically - err = r.updateMetricsForScale(ctx, pa, scale, metricKey) + err = r.updateMetricsForScale(ctx, pa, scale, metricKey, metricSource) if err != nil { r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedUpdateMetrics", err.Error()) return ctrl.Result{}, fmt.Errorf("failed to update metrics for scale target reference: %v", err) @@ -608,7 +612,7 @@ func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autos return autoScaler.UpdateScalingContext(pa) } -func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric) (err error) { +func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric, metricSource autoscalingv1alpha1.MetricSource) (err error) { currentTimestamp := time.Now() var autoScaler scaler.Scaler autoScaler, exists := r.AutoscalerMap[metricKey] @@ -640,11 +644,6 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa } } - // update metrics - for _, source := range pa.Spec.MetricsSources { - return autoScaler.UpdateSourceMetrics(ctx, metricKey, source, currentTimestamp) - } - // Retrieve the selector string from the Scale object's Status, // and convert *metav1.LabelSelector object to labels.Selector structure labelsSelector, err := extractLabelSelector(scale) @@ -662,10 +661,12 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa // TODO: do we need to indicate the metrics source. // Technically, the metrics could come from Kubernetes metrics API (resource or custom), pod prometheus endpoint or ai runtime - // Update targets - if err := autoScaler.UpdateScaleTargetMetrics(ctx, metricKey, podList.Items, currentTimestamp); err != nil { - return err + switch metricSource.MetricSourceType { + case autoscalingv1alpha1.POD: + return autoScaler.UpdateScaleTargetMetrics(ctx, metricKey, metricSource, podList.Items, currentTimestamp) + case autoscalingv1alpha1.DOMAIN: + return autoScaler.UpdateSourceMetrics(ctx, metricKey, metricSource, currentTimestamp) + default: + return fmt.Errorf("unsupported protocol type: %v", metricSource.ProtocolType) } - - return nil } diff --git a/pkg/controller/podautoscaler/podautoscaler_controller_test.go b/pkg/controller/podautoscaler/podautoscaler_controller_test.go index 1afc2122..2ef56e4c 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller_test.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller_test.go @@ -66,6 +66,14 @@ var _ = Describe("PodAutoscaler Controller", func() { MinReplicas: IntToPtr(5), MaxReplicas: 10, ScalingStrategy: "HPA", + MetricsSources: []autoscalingv1alpha1.MetricSource{ + { + MetricSourceType: autoscalingv1alpha1.POD, + ProtocolType: autoscalingv1alpha1.HTTP, + TargetMetric: "test_metric", + TargetValue: "1", + }, + }, }, } Expect(k8sClient.Create(ctx, resource)).To(Succeed()) diff --git a/pkg/controller/podautoscaler/scaler/apa.go b/pkg/controller/podautoscaler/scaler/apa.go index c3583272..76aa88ee 100644 --- a/pkg/controller/podautoscaler/scaler/apa.go +++ b/pkg/controller/podautoscaler/scaler/apa.go @@ -164,10 +164,8 @@ func (a *ApaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name } } -func (a *ApaAutoscaler) UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, pods []v1.Pod, now time.Time) error { - // TODO: let's update this fix port later. - metricPort := 8000 - metricValues, err := a.metricClient.GetMetricsFromPods(ctx, pods, metricKey.MetricName, metricPort) +func (a *ApaAutoscaler) UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, source autoscalingv1alpha1.MetricSource, pods []v1.Pod, now time.Time) error { + metricValues, err := a.metricClient.GetMetricsFromPods(ctx, pods, source) if err != nil { return err } diff --git a/pkg/controller/podautoscaler/scaler/apa_test.go b/pkg/controller/podautoscaler/scaler/apa_test.go index 2a39a972..42bee354 100644 --- a/pkg/controller/podautoscaler/scaler/apa_test.go +++ b/pkg/controller/podautoscaler/scaler/apa_test.go @@ -17,6 +17,7 @@ limitations under the License. package scaler import ( + "fmt" "testing" "time" @@ -45,14 +46,24 @@ func TestAPAScale(t *testing.T) { Namespace: "test_ns", }, Spec: autoscalingv1alpha1.PodAutoscalerSpec{ - TargetMetric: "ttot", // Set TargetMetric to "ttot" + MetricsSources: []autoscalingv1alpha1.MetricSource{ + { + MetricSourceType: autoscalingv1alpha1.POD, + ProtocolType: autoscalingv1alpha1.HTTP, + TargetMetric: spec.ScalingMetric, + TargetValue: fmt.Sprintf("%f", spec.TargetValue), + }, + }, ScaleTargetRef: corev1.ObjectReference{ Name: "llama-70b", }, }, } - metricKey := metrics.NewNamespaceNameMetric(&pa) + metricKey, _, err := metrics.NewNamespaceNameMetric(&pa) + if err != nil { + t.Errorf("NewNamespaceNameMetric() failed: %v", err) + } _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-60*time.Second), 10.0) _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-50*time.Second), 11.0) _ = apaMetricsClient.UpdateMetricIntoWindow(now.Add(-40*time.Second), 12.0) @@ -94,14 +105,14 @@ func TestApaUpdateContext(t *testing.T) { Kind: "Deployment", Name: "example-deployment", }, - MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned - MaxReplicas: 5, - TargetValue: "1", - TargetMetric: "test.metrics", + MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned + MaxReplicas: 5, MetricsSources: []autoscalingv1alpha1.MetricSource{ { - Endpoint: "service1.example.com", - Path: "/api/metrics/cpu", + Endpoint: "service1.example.com", + Path: "/api/metrics/cpu", + TargetValue: "1", + TargetMetric: "test.metrics", }, }, ScalingStrategy: "APA", diff --git a/pkg/controller/podautoscaler/scaler/interface.go b/pkg/controller/podautoscaler/scaler/interface.go index 54f52107..291c71a8 100644 --- a/pkg/controller/podautoscaler/scaler/interface.go +++ b/pkg/controller/podautoscaler/scaler/interface.go @@ -54,7 +54,7 @@ type Scaler interface { // when the last metric update occurred and can be used to calculate time-based scaling actions. // // This method ensures that the autoscaler has up-to-date metrics before making any scaling decisions. - UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, pods []corev1.Pod, now time.Time) error + UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, source autoscalingv1alpha1.MetricSource, pods []corev1.Pod, now time.Time) error // UpdateSourceMetrics updates the current state of metrics used to determine scaling actions. // It processes the latest metrics for a metrics source and stores diff --git a/pkg/controller/podautoscaler/scaler/kpa.go b/pkg/controller/podautoscaler/scaler/kpa.go index 9fe78473..4b793728 100644 --- a/pkg/controller/podautoscaler/scaler/kpa.go +++ b/pkg/controller/podautoscaler/scaler/kpa.go @@ -123,6 +123,7 @@ func NewKpaScalingContextByPa(pa *autoscalingv1alpha1.PodAutoscaler) (*KpaScalin } func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscaler) error { + err := k.BaseScalingContext.UpdateByPaTypes(pa) if err != nil { return err @@ -381,10 +382,8 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name } } -func (k *KpaAutoscaler) UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, pods []v1.Pod, now time.Time) error { - // TODO: let's update this fix port later. - metricPort := 8000 - metricValues, err := k.metricClient.GetMetricsFromPods(ctx, pods, metricKey.MetricName, metricPort) +func (k *KpaAutoscaler) UpdateScaleTargetMetrics(ctx context.Context, metricKey metrics.NamespaceNameMetric, source autoscalingv1alpha1.MetricSource, pods []v1.Pod, now time.Time) error { + metricValues, err := k.metricClient.GetMetricsFromPods(ctx, pods, source) if err != nil { return err } diff --git a/pkg/controller/podautoscaler/scaler/kpa_test.go b/pkg/controller/podautoscaler/scaler/kpa_test.go index d631987d..e0a5d8c2 100644 --- a/pkg/controller/podautoscaler/scaler/kpa_test.go +++ b/pkg/controller/podautoscaler/scaler/kpa_test.go @@ -17,6 +17,7 @@ limitations under the License. package scaler import ( + "fmt" "testing" "time" @@ -80,10 +81,12 @@ func TestKpaScale(t *testing.T) { Namespace: "test_ns", }, Spec: autoscalingv1alpha1.PodAutoscalerSpec{ - TargetMetric: spec.ScalingMetric, MetricsSources: []autoscalingv1alpha1.MetricSource{ { - Name: spec.ScalingMetric, + MetricSourceType: autoscalingv1alpha1.POD, + ProtocolType: autoscalingv1alpha1.HTTP, + TargetMetric: spec.ScalingMetric, + TargetValue: fmt.Sprintf("%f", spec.TargetValue), }, }, ScaleTargetRef: corev1.ObjectReference{ @@ -92,7 +95,11 @@ func TestKpaScale(t *testing.T) { }, } - metricKey := metrics.NewNamespaceNameMetric(&pa) + metricKey, _, err := metrics.NewNamespaceNameMetric(&pa) + + if err != nil { + t.Errorf("NewNamespaceNameMetric() failed: %v", err) + } result := kpaScaler.Scale(readyPodCount, metricKey, now) // recent rapid rising metric value make scaler adapt turn on panic mode @@ -108,14 +115,14 @@ func TestKpaUpdateContext(t *testing.T) { Kind: "Deployment", Name: "example-deployment", }, - MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned - MaxReplicas: 5, - TargetValue: "1", - TargetMetric: "test.metrics", + MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned + MaxReplicas: 5, MetricsSources: []autoscalingv1alpha1.MetricSource{ { - Endpoint: "service1.example.com", - Path: "/api/metrics/cpu", + Endpoint: "service1.example.com", + Path: "/api/metrics/cpu", + TargetMetric: "test.metrics", + TargetValue: "1", }, }, ScalingStrategy: "KPA", diff --git a/pkg/controller/podautoscaler/utils.go b/pkg/controller/podautoscaler/utils.go index ad5f81a6..7385943f 100644 --- a/pkg/controller/podautoscaler/utils.go +++ b/pkg/controller/podautoscaler/utils.go @@ -19,8 +19,6 @@ package podautoscaler import ( "fmt" - autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" - "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" @@ -52,7 +50,3 @@ func extractLabelSelector(scale *unstructured.Unstructured) (labels.Selector, er return labelsSelector, nil } - -func NewNamespaceNameMetricByPa(pa autoscalingv1alpha1.PodAutoscaler) metrics.NamespaceNameMetric { - return metrics.NewNamespaceNameMetric(&pa) -}