From a70c82dde28463bec6443a1c3d2495d84f0ac893 Mon Sep 17 00:00:00 2001 From: Rong-Kang Date: Sat, 5 Oct 2024 12:51:19 +0800 Subject: [PATCH] Add AIBrix Custom Autoscaling Algorithm APA (#223) * add apa algorithm * Add HPCA algorithm as a implementation of APA * fix make lint * Add separate pod-autoscaler service account --------- Co-authored-by: Jiaxin Shan --- .../autoscaling_v1alpha1_mock_llama_apa.yaml | 18 +++++ docs/development/app/app.py | 2 +- docs/development/app/deployment.yaml | 54 +++++++++++++ docs/tutorial/podautoscaler/README.md | 77 +++++++++++++++++- .../podautoscaler/podautoscaler_controller.go | 13 ++- .../podautoscaler/scaler/apa_test.go | 79 +++++++++++++++++++ .../podautoscaler/scaler/interface.go | 4 +- pkg/controller/podautoscaler/scaler/kpa.go | 75 +++++++++++++++--- .../podautoscaler/scaler/kpa_test.go | 4 +- 9 files changed, 303 insertions(+), 23 deletions(-) create mode 100644 config/samples/autoscaling_v1alpha1_mock_llama_apa.yaml create mode 100644 pkg/controller/podautoscaler/scaler/apa_test.go diff --git a/config/samples/autoscaling_v1alpha1_mock_llama_apa.yaml b/config/samples/autoscaling_v1alpha1_mock_llama_apa.yaml new file mode 100644 index 00000000..a3306fb1 --- /dev/null +++ b/config/samples/autoscaling_v1alpha1_mock_llama_apa.yaml @@ -0,0 +1,18 @@ +apiVersion: autoscaling.aibrix.ai/v1alpha1 +kind: PodAutoscaler +metadata: + name: podautoscaler-example-mock-llama-apa + labels: + app.kubernetes.io/name: aibrix + app.kubernetes.io/managed-by: kustomize + namespace: aibrix-system +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: llama2-70b + minReplicas: 1 + maxReplicas: 10 + targetMetric: "avg_prompt_throughput_toks_per_s" + targetValue: "20" + scalingStrategy: "APA" \ No newline at end of file diff --git a/docs/development/app/app.py b/docs/development/app/app.py index c1434851..7bc8e188 100644 --- a/docs/development/app/app.py +++ b/docs/development/app/app.py @@ -11,7 +11,7 @@ MODEL_NAME = 'llama2-70b' DEPLOYMENT_NAME = os.getenv('DEPLOYMENT_NAME', 'llama2-70b') -NAMESPACE = os.getenv('NAMESPACE', 'default') +NAMESPACE = os.getenv('NAMESPACE', 'aibrix-system') DEFAULT_REPLICAS = int(os.getenv('DEFAULT_REPLICAS', '1')) models = [ diff --git a/docs/development/app/deployment.yaml b/docs/development/app/deployment.yaml index 28f0280d..69907f23 100644 --- a/docs/development/app/deployment.yaml +++ b/docs/development/app/deployment.yaml @@ -53,6 +53,60 @@ spec: targetPort: 8000 nodePort: 30081 type: NodePort +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: pod-autoscaler + namespace: aibrix-system +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: pod-reader + namespace: aibrix-system +rules: + - apiGroups: [""] + resources: ["pods"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: read-pods + namespace: aibrix-system +subjects: + - kind: ServiceAccount + name: pod-autoscaler + namespace: aibrix-system +roleRef: + kind: Role + name: pod-reader + apiGroup: rbac.authorization.k8s.io +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: aibrix-system + name: deployment-reader +rules: + - apiGroups: ["apps"] + resources: ["deployments"] + verbs: ["get", "list", "watch"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: deployment-reader-binding + namespace: aibrix-system +subjects: + - kind: ServiceAccount + name: pod-autoscaler + namespace: aibrix-system +roleRef: + kind: Role + name: deployment-reader + apiGroup: rbac.authorization.k8s.io # --- # for test-purpose, if need to create HTTPRoute object manually # apiVersion: gateway.networking.k8s.io/v1 diff --git a/docs/tutorial/podautoscaler/README.md b/docs/tutorial/podautoscaler/README.md index b69a27ab..a4715ab9 100644 --- a/docs/tutorial/podautoscaler/README.md +++ b/docs/tutorial/podautoscaler/README.md @@ -55,7 +55,7 @@ Starting workers {"controller": "podautoscaler", "controllerGroup": "autoscaling For debugging purposes, you can expose the ports in Kubernetes using the following command: ```shell -kubectl port-forward svc/llama2-70b 8000:8000 +kubectl port-forward svc/llama2-70b 8000:8000 -n aibrix-system ``` ## Start 2: Build and Deploy Manager @@ -308,6 +308,80 @@ kubectl get pods -n aibrix-system -o name | grep aibrix-controller-manager | hea The Mocked Llama has an average prompt throughput of 100 tokens per second (`avg_prompt_throughput_toks_per_s`). The AutoScaler aims to maintain each pod's metrics at 20. As indicated in the events, the KPA podautoscaler adjusted the replicas from 3 to 5. + +# Case 4: Create a APA-Based AIBrix Pod Autoscaler on Mocked Llama + +## Launching Mocked Llama + +The Mocked Llama is a simulation of a vllm-based Llama deployment. It provides mocked metrics for scaling purposes, following the standard Prometheus protocol. + +For a detailed introduction, refer to the [README.md](../../development/app/README.md). + +### Deployment on K8S + +Deploy using the following commands: + +```shell +kubectl apply -f docs/development/app/deployment.yaml +kubectl get deployments --all-namespaces |grep llama2 +``` + +You should see the deployment status similar to this: + +```log +NAME READY UP-TO-DATE AVAILABLE AGE +llama2-70b 3/3 3 3 16s +``` + +## Autoscaling + +If you have created other autoscaler on this mocked llama deployment, deleted them first: +```shell +kubectl delete podautoscalers.autoscaling.aibrix.ai podautoscaler-example-mock-llama -n aibrix-system +kubectl delete podautoscalers.autoscaling.aibrix.ai podautoscaler-example-mock-llama-apa -n aibrix-system +``` + +Create an autoscaler of type APA: + +```shell +kubectl apply -f config/samples/autoscaling_v1alpha1_mock_llama_apa.yaml +kubectl get podautoscalers --all-namespaces +``` + +```log +NAMESPACE NAME AGE +aibrix-system podautoscaler-example-mock-llama-apa 65m +``` + +## Scaling Result, Logs and Events + + +```shell +kubectl get deployments --all-namespaces |grep llama2 +``` + +The deployment has been rescaled to 5 replicas: + +```log +aibrix-system llama2-70b 5/5 5 5 65m +``` + +Check the events of APA podautoscalers, you can see the scaling details: + +```shell +kubectl describe podautoscalers podautoscaler-example-mock-llama-apa -n aibrix-system +``` + +```log +Events: + Type Reason Age From Message + ---- ------ ---- ---- ------- + Normal AlgorithmRun 78s PodAutoscaler APA algorithm run. currentReplicas: 3, desiredReplicas: 5, rescale: true + Normal SuccessfulRescale 78s PodAutoscaler New size: 5; reason: avg_prompt_throughput_toks_per_s above target + Normal AlgorithmRun 77s PodAutoscaler APA algorithm run. currentReplicas: 5, desiredReplicas: 5, rescale: false +``` + + # Cleanup To clean up the resources: @@ -316,6 +390,7 @@ To clean up the resources: # Remove AIBrix resources kubectl delete podautoscalers.autoscaling.aibrix.ai podautoscaler-example kubectl delete podautoscalers.autoscaling.aibrix.ai podautoscaler-example-mock-llama -n aibrix-system +kubectl delete podautoscalers.autoscaling.aibrix.ai podautoscaler-example-mock-llama-apa -n aibrix-system make uninstall && make undeploy diff --git a/pkg/controller/podautoscaler/podautoscaler_controller.go b/pkg/controller/podautoscaler/podautoscaler_controller.go index 6ffdd501..78ea0120 100644 --- a/pkg/controller/podautoscaler/podautoscaler_controller.go +++ b/pkg/controller/podautoscaler/podautoscaler_controller.go @@ -144,10 +144,8 @@ func (r *PodAutoscalerReconciler) Reconcile(ctx context.Context, req ctrl.Reques switch pa.Spec.ScalingStrategy { case autoscalingv1alpha1.HPA: return r.reconcileHPA(ctx, pa) - case autoscalingv1alpha1.KPA: + case autoscalingv1alpha1.KPA, autoscalingv1alpha1.APA: return r.reconcileKPA(ctx, pa) - case autoscalingv1alpha1.APA: - return r.reconcileAPA(ctx, pa) default: return ctrl.Result{}, fmt.Errorf("unknown autoscaling strategy: %s", pa.Spec.ScalingStrategy) } @@ -315,10 +313,9 @@ func (r *PodAutoscalerReconciler) reconcileKPA(ctx context.Context, pa autoscali rescale = desiredReplicas != currentReplicas } - r.EventRecorder.Eventf(&pa, corev1.EventTypeNormal, "KPAAlgorithmRun", - "KPA algorithm run. currentReplicas: %d, desiredReplicas: %d, rescale: %t", - desiredReplicas, currentReplicas, rescale) - + r.EventRecorder.Eventf(&pa, corev1.EventTypeNormal, "AlgorithmRun", + "%s algorithm run. currentReplicas: %d, desiredReplicas: %d, rescale: %t", + pa.Spec.ScalingStrategy, currentReplicas, desiredReplicas, rescale) if rescale { if err := r.updateScale(ctx, pa.Namespace, targetGR, scale, desiredReplicas); err != nil { @@ -496,7 +493,7 @@ func (r *PodAutoscalerReconciler) computeReplicasForMetrics(ctx context.Context, metricKey := metrics.NewNamespaceNameMetric(pa.Namespace, pa.Spec.ScaleTargetRef.Name, pa.Spec.TargetMetric) // Calculate the desired number of pods using the autoscaler logic. - scaleResult := r.Autoscaler.Scale(int(originalReadyPodsCount), metricKey, currentTimestamp) + scaleResult := r.Autoscaler.Scale(int(originalReadyPodsCount), metricKey, currentTimestamp, pa.Spec.ScalingStrategy) if scaleResult.ScaleValid { logger.V(4).Info("Successfully called Scale Algorithm", "scaleResult", scaleResult) return scaleResult.DesiredPodCount, metricKey.MetricName, currentTimestamp, nil diff --git a/pkg/controller/podautoscaler/scaler/apa_test.go b/pkg/controller/podautoscaler/scaler/apa_test.go new file mode 100644 index 00000000..76e122fb --- /dev/null +++ b/pkg/controller/podautoscaler/scaler/apa_test.go @@ -0,0 +1,79 @@ +/* +Copyright 2024 The Aibrix Team. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scaler + +import ( + "testing" + "time" + + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" +) + +// TestHcpaScale tests the APA behavior. For now, APA implements HCPA algorithm. +func TestAPAScale(t *testing.T) { + readyPodCount := 5 + kpaMetricsClient := metrics.NewKPAMetricsClient() + now := time.Now() + metricKey := metrics.NewNamespaceNameMetric("test_ns", "llama-70b", "ttot") + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-60*time.Second), 10.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-50*time.Second), 11.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-40*time.Second), 12.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-30*time.Second), 13.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-20*time.Second), 14.0) + _ = kpaMetricsClient.UpdateMetricIntoWindow(metricKey, now.Add(-10*time.Second), 100.0) + + kpaScaler, err := NewKpaAutoscaler(readyPodCount, + &DeciderKpaSpec{ + MaxScaleUpRate: 2, + MaxScaleDownRate: 2, + ScalingMetric: metricKey.MetricName, + TargetValue: 10, + TotalValue: 500, + PanicThreshold: 2.0, + StableWindow: 60 * time.Second, + ScaleDownDelay: 10 * time.Second, + ActivationScale: 2, + UpFluctuationTolerance: 0.1, + DownFluctuationTolerance: 0.2, + }, + ) + kpaScaler.metricsClient = kpaMetricsClient + if err != nil { + t.Errorf("Failed to create KpaAutoscaler: %v", err) + } + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + // test 1: + result := kpaScaler.Scale(readyPodCount, metricKey, now, autoscalingv1alpha1.APA) + // recent rapid rising metric value make scaler adapt turn on panic mode + if result.DesiredPodCount != 10 { + t.Errorf("result.DesiredPodCount = 10, got %d", result.DesiredPodCount) + } + + // test 2: + // 1.1 means APA won't scale up unless current usage > TargetValue * (1+1.1), i.e. 210% + // In this test case with UpFluctuationTolerance = 1.1, APA will not scale up. + kpaScaler.deciderSpec.UpFluctuationTolerance = 1.1 + result = kpaScaler.Scale(readyPodCount, metricKey, now, autoscalingv1alpha1.APA) + // recent rapid rising metric value make scaler adapt turn on panic mode + if result.DesiredPodCount != int32(readyPodCount) { + t.Errorf("result should remain previous replica = %d, but got %d", readyPodCount, result.DesiredPodCount) + } +} diff --git a/pkg/controller/podautoscaler/scaler/interface.go b/pkg/controller/podautoscaler/scaler/interface.go index ef0ac396..c225ad44 100644 --- a/pkg/controller/podautoscaler/scaler/interface.go +++ b/pkg/controller/podautoscaler/scaler/interface.go @@ -20,6 +20,8 @@ import ( "sync" "time" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -61,7 +63,7 @@ type Scaler interface { // ScaleResult which contains the recommended number of pods to scale up or down to. // // Refer to: KpaAutoscaler.Scale Implementation - Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time) ScaleResult + Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time, strategy autoscalingv1alpha1.ScalingStrategyType) ScaleResult } // ScaleResult contains the results of a scaling decision. diff --git a/pkg/controller/podautoscaler/scaler/kpa.go b/pkg/controller/podautoscaler/scaler/kpa.go index f2b935cc..b06ee9d8 100644 --- a/pkg/controller/podautoscaler/scaler/kpa.go +++ b/pkg/controller/podautoscaler/scaler/kpa.go @@ -86,21 +86,31 @@ type DeciderKpaSpec struct { // ScaleDownDelay is the time that must pass at reduced concurrency before a // scale-down decision is applied. ScaleDownDelay time.Duration + + // The two following attributes are specific to APA. We may separate them from DeciderKpaSpec later. + // UpFluctuationTolerance represents the threshold before scaling up, + // which means no scaling up will occur unless the currentMetricValue exceeds the TargetValue by more than UpFluctuationTolerance + // UpFluctuationTolerance represents the threshold before scaling down, + // which means no scaling down will occur unless the currentMetricValue is less than the TargetValue by more than UpFluctuationTolerance + UpFluctuationTolerance float64 + DownFluctuationTolerance float64 } // NewDefaultDeciderKpaSpec references KPA and sets up a default configuration. func NewDefaultDeciderKpaSpec() *DeciderKpaSpec { return &DeciderKpaSpec{ - MaxScaleUpRate: 2, // Scale up rate of 200%, allowing rapid scaling - MaxScaleDownRate: 2, // Scale down rate of 50%, for more gradual reduction - ScalingMetric: "CPU", // Metric used for scaling, here set to CPU utilization - TargetValue: 30.0, // Target CPU utilization set at 10% - TotalValue: 100.0, // Total CPU utilization capacity for pods is 100% - TargetBurstCapacity: 2.0, // Target burst capacity to handle sudden spikes - ActivationScale: 1, // Initial scaling factor upon activation - PanicThreshold: 2.0, // Panic threshold set at 200% to trigger rapid scaling - StableWindow: 60 * time.Second, // Time window to stabilize before altering scale - ScaleDownDelay: 30 * time.Minute, // Delay before scaling down to avoid flapping + MaxScaleUpRate: 2, // Scale up rate of 200%, allowing rapid scaling + MaxScaleDownRate: 2, // Scale down rate of 50%, for more gradual reduction + ScalingMetric: "CPU", // Metric used for scaling, here set to CPU utilization + TargetValue: 30.0, // Target CPU utilization set at 10% + TotalValue: 100.0, // Total CPU utilization capacity for pods is 100% + TargetBurstCapacity: 2.0, // Target burst capacity to handle sudden spikes + ActivationScale: 1, // Initial scaling factor upon activation + PanicThreshold: 2.0, // Panic threshold set at 200% to trigger rapid scaling + StableWindow: 60 * time.Second, // Time window to stabilize before altering scale + ScaleDownDelay: 30 * time.Minute, // Delay before scaling down to avoid flapping + UpFluctuationTolerance: 0.1, // Tolerance for scaling up, set at 10% + DownFluctuationTolerance: 0.2, // Tolerance for scaling up, set at 10% } } @@ -171,9 +181,41 @@ func NewKpaAutoscaler(readyPodsCount int, spec *DeciderKpaSpec) (*KpaAutoscaler, }, nil } +// APA_Scale references and enhances the algorithm in the following paper:. +// +// Huo, Qizheng, et al. "High Concurrency Response Strategy based on Kubernetes Horizontal Pod Autoscaler." +// Journal of Physics: Conference Series. Vol. 2451. No. 1. IOP Publishing, 2023. +func (k *KpaAutoscaler) APA_Scale(currentPodCount float64, currentUsePerPod float64, spec *DeciderKpaSpec) int32 { + expectedUse := spec.TargetValue + upTolerance := spec.UpFluctuationTolerance + downTolerance := spec.DownFluctuationTolerance + + // Check if scaling up is necessary + if currentUsePerPod/expectedUse > (1 + upTolerance) { + maxScaleUp := math.Ceil(spec.MaxScaleUpRate * currentPodCount) + expectedPods := int32(math.Ceil(currentPodCount * (currentUsePerPod / expectedUse))) + // Ensure the number of pods does not exceed the maximum scale-up limit + if float64(expectedPods) > maxScaleUp { + expectedPods = int32(maxScaleUp) + } + return expectedPods + } else if currentUsePerPod/expectedUse < (1 - downTolerance) { // Check if scaling down is necessary + maxScaleDown := math.Floor(currentPodCount / spec.MaxScaleDownRate) + expectedPods := int32(math.Ceil(currentPodCount * (currentUsePerPod / expectedUse))) + // Ensure the number of pods does not fall below the minimum scale-down limit + if float64(expectedPods) < maxScaleDown { + expectedPods = int32(maxScaleDown) + } + return expectedPods + } + + // If the current utilization is within the expected range, maintain the current pod count + return int32(currentPodCount) +} + // Scale implements Scaler interface in KpaAutoscaler. // Refer to knative-serving: pkg/autoscaler/scaling/autoscaler.go, Scale function. -func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time) ScaleResult { +func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.NamespaceNameMetric, now time.Time, strategy autoscalingv1alpha1.ScalingStrategyType) ScaleResult { /** `observedStableValue` and `observedPanicValue` are calculated using different window sizes in the `MetricClient`. For reference, see the KNative implementation at `pkg/autoscaler/metrics/collector.go:185`. @@ -187,6 +229,17 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name return ScaleResult{} } + if strategy == autoscalingv1alpha1.APA { + currentUsePerPod := observedPanicValue / float64(originalReadyPodsCount) + desiredPodCount := k.APA_Scale(float64(originalReadyPodsCount), currentUsePerPod, spec) + klog.InfoS("Use APA scaling strategy", "currentPodCount", originalReadyPodsCount, "currentUsePerPod", currentUsePerPod, "desiredPodCount", desiredPodCount) + return ScaleResult{ + DesiredPodCount: desiredPodCount, + ExcessBurstCapacity: 0, + ScaleValid: true, + } + } + // Use 1 if there are zero current pods. readyPodsCount := math.Max(1, float64(originalReadyPodsCount)) diff --git a/pkg/controller/podautoscaler/scaler/kpa_test.go b/pkg/controller/podautoscaler/scaler/kpa_test.go index ef01022b..5c65e148 100644 --- a/pkg/controller/podautoscaler/scaler/kpa_test.go +++ b/pkg/controller/podautoscaler/scaler/kpa_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1" + "github.com/aibrix/aibrix/pkg/controller/podautoscaler/metrics" ) @@ -59,7 +61,7 @@ func TestKpaScale(t *testing.T) { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - result := kpaScaler.Scale(readyPodCount, metricKey, now) + result := kpaScaler.Scale(readyPodCount, metricKey, now, autoscalingv1alpha1.KPA) // recent rapid rising metric value make scaler adapt turn on panic mode if result.DesiredPodCount != 10 { t.Errorf("result.DesiredPodCount = 10, got %d", result.DesiredPodCount)