Skip to content

Commit

Permalink
Fix KPA bug, and an elaborate KPA test case (#515)
Browse files Browse the repository at this point in the history
* use pod count to construct KPA. add an elaborate KPA test case

* Error return value of is not checked

---------

Co-authored-by: Jiaxin Shan <[email protected]>
  • Loading branch information
kr11 and Jeffwan authored Dec 10, 2024
1 parent 98b5ca4 commit ab1c382
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 26 deletions.
21 changes: 12 additions & 9 deletions pkg/controller/podautoscaler/podautoscaler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,6 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto

setCondition(&pa, "AbleToScale", metav1.ConditionTrue, "SucceededGetScale", "the %s controller was able to get the target's current scale", paType)

// Update the scale required metrics periodically
err = r.updateMetricsForScale(ctx, pa, scale, metricKey, metricSource)
if err != nil {
r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedUpdateMetrics", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to update metrics for scale target reference: %v", err)
}
// current scale's replica count
currentReplicasInt64, found, err := unstructured.NestedInt64(scale.Object, "spec", "replicas")
if !found {
Expand All @@ -346,6 +340,13 @@ func (r *PodAutoscalerReconciler) reconcileCustomPA(ctx context.Context, pa auto
}
currentReplicas := int32(currentReplicasInt64)

// Update the scale required metrics periodically
err = r.updateMetricsForScale(ctx, pa, scale, metricKey, metricSource, int(currentReplicas))
if err != nil {
r.EventRecorder.Event(&pa, corev1.EventTypeWarning, "FailedUpdateMetrics", err.Error())
return ctrl.Result{}, fmt.Errorf("failed to update metrics for scale target reference: %v", err)
}

// desired replica count
desiredReplicas := int32(0)
rescaleReason := ""
Expand Down Expand Up @@ -612,9 +613,11 @@ func (r *PodAutoscalerReconciler) updateScalerSpec(ctx context.Context, pa autos
return autoScaler.UpdateScalingContext(pa)
}

func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric, metricSource autoscalingv1alpha1.MetricSource) (err error) {
// updateMetricsForScale: we pass into the currentReplicas to construct autoScaler, as KNative implementation
func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa autoscalingv1alpha1.PodAutoscaler, scale *unstructured.Unstructured, metricKey metrics.NamespaceNameMetric, metricSource autoscalingv1alpha1.MetricSource, currentReplicas int) (err error) {
currentTimestamp := time.Now()
var autoScaler scaler.Scaler
// it's similar to knative: pkg/autoscaler/scaling/multiscaler.go: func (m *MultiScaler) Create
autoScaler, exists := r.AutoscalerMap[metricKey]
if !exists {
klog.InfoS("Scaler not found, creating new scaler", "metricKey", metricKey, "type", pa.Spec.ScalingStrategy)
Expand All @@ -625,9 +628,9 @@ func (r *PodAutoscalerReconciler) updateMetricsForScale(ctx context.Context, pa
// TODO Currently, we initialize kpa with default config and allocate window with default length.
// We then reallocate window according to pa until UpdateScalingContext.
// it's not wrong, but we allocate window twice, to be optimized.
autoScaler, err = scaler.NewKpaAutoscaler(0, &pa)
autoScaler, err = scaler.NewKpaAutoscaler(currentReplicas, &pa, time.Now())
case autoscalingv1alpha1.APA:
autoScaler, err = scaler.NewApaAutoscaler(0, &pa)
autoScaler, err = scaler.NewApaAutoscaler(currentReplicas, &pa)
default:
return fmt.Errorf("unsupported scaling strategy: %s", pa.Spec.ScalingStrategy)
}
Expand Down
32 changes: 25 additions & 7 deletions pkg/controller/podautoscaler/scaler/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ const (
activationScaleLabel = KPALabelPrefix + "activation-scale"
panicThresholdLabel = KPALabelPrefix + "panic-threshold"
stableWindowLabel = KPALabelPrefix + "stable-window"
panicWindowLabel = KPALabelPrefix + "panic-window"
scaleDownDelayLabel = KPALabelPrefix + "scale-down-delay"
)

Expand Down Expand Up @@ -154,6 +155,12 @@ func (k *KpaScalingContext) UpdateByPaTypes(pa *autoscalingv1alpha1.PodAutoscale
return err
}
k.StableWindow = v
case panicWindowLabel:
v, err := time.ParseDuration(value)
if err != nil {
return err
}
k.PanicWindow = v
case scaleDownDelayLabel:
v, err := time.ParseDuration(value)
if err != nil {
Expand Down Expand Up @@ -182,7 +189,7 @@ type KpaAutoscaler struct {
var _ Scaler = (*KpaAutoscaler)(nil)

// NewKpaAutoscaler Initialize KpaAutoscaler: Referenced from `knative/pkg/autoscaler/scaling/autoscaler.go newAutoscaler`
func NewKpaAutoscaler(readyPodsCount int, pa *autoscalingv1alpha1.PodAutoscaler) (*KpaAutoscaler, error) {
func NewKpaAutoscaler(readyPodsCount int, pa *autoscalingv1alpha1.PodAutoscaler, now time.Time) (*KpaAutoscaler, error) {
spec, err := NewKpaScalingContextByPa(pa)
if err != nil {
return nil, err
Expand All @@ -207,7 +214,7 @@ func NewKpaAutoscaler(readyPodsCount int, pa *autoscalingv1alpha1.PodAutoscaler)
// accumulate enough data to make conscious decisions.
var panicTime time.Time
if readyPodsCount > 1 {
panicTime = time.Now()
panicTime = now
} else {
panicTime = time.Time{} // Zero value for time if not in panic mode
}
Expand Down Expand Up @@ -297,22 +304,26 @@ func (k *KpaAutoscaler) Scale(originalReadyPodsCount int, metricKey metrics.Name
"PanicThreshold", spec.PanicThreshold, "isOverPanicThreshold", isOverPanicThreshold,
)

if k.panicTime.IsZero() && isOverPanicThreshold {
if !k.InPanicMode() && isOverPanicThreshold {
// Begin panicking when we cross the threshold in the panic window.
klog.InfoS("Begin panicking")
klog.InfoS("Begin panicking.", "panicTime", now)
k.panicTime = now
} else if isOverPanicThreshold {
// If we're still over panic threshold right now — extend the panic window.
klog.V(4).InfoS("update panic time.", "panicTime", now)
k.panicTime = now
} else if !k.panicTime.IsZero() && !isOverPanicThreshold && k.panicTime.Add(spec.StableWindow).Before(now) {
// Stop panicking after the surge has made its way into the stable metric.
} else if k.InPanicMode() && !isOverPanicThreshold && k.panicTime.Add(spec.StableWindow).Before(now) {
// Stop panicking only if there are:
// 1. now it's in panic mode (!k.panicTime.IsZero())
// 2. current metric value is no more over the threshold
// 3. the time has already surpassed the stable window length since the metric value last exceeded the panic threshold.
klog.InfoS("Exit panicking.")
k.panicTime = time.Time{}
k.maxPanicPods = 0
}

desiredPodCount := desiredStablePodCount
if !k.panicTime.IsZero() {
if k.InPanicMode() {
// In some edgecases stable window metric might be larger
// than panic one. And we should provision for stable as for panic,
// so pick the larger of the two.
Expand Down Expand Up @@ -438,3 +449,10 @@ func (k *KpaAutoscaler) GetScalingContext() scalingcontext.ScalingContext {

return k.scalingContext
}

func (k *KpaAutoscaler) InPanicMode() bool {
k.specMux.Lock()
defer k.specMux.Unlock()

return !k.panicTime.IsZero()
}
206 changes: 197 additions & 9 deletions pkg/controller/podautoscaler/scaler/kpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package scaler

import (
"flag"
"fmt"
"testing"
"time"

"k8s.io/klog/v2"

"github.com/aibrix/aibrix/pkg/controller/podautoscaler/aggregation"
"github.com/aibrix/aibrix/pkg/controller/podautoscaler/algorithm"
scalingcontext "github.com/aibrix/aibrix/pkg/controller/podautoscaler/common"

autoscalingv1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
v1alpha1 "github.com/aibrix/aibrix/api/autoscaling/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand Down Expand Up @@ -76,15 +79,15 @@ func TestKpaScale(t *testing.T) {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

pa := autoscalingv1alpha1.PodAutoscaler{
pa := v1alpha1.PodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test_ns",
},
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
MetricsSources: []autoscalingv1alpha1.MetricSource{
Spec: v1alpha1.PodAutoscalerSpec{
MetricsSources: []v1alpha1.MetricSource{
{
MetricSourceType: autoscalingv1alpha1.POD,
ProtocolType: autoscalingv1alpha1.HTTP,
MetricSourceType: v1alpha1.POD,
ProtocolType: v1alpha1.HTTP,
TargetMetric: spec.ScalingMetric,
TargetValue: fmt.Sprintf("%f", spec.TargetValue),
},
Expand All @@ -109,15 +112,15 @@ func TestKpaScale(t *testing.T) {
}

func TestKpaUpdateContext(t *testing.T) {
pa := &autoscalingv1alpha1.PodAutoscaler{
Spec: autoscalingv1alpha1.PodAutoscalerSpec{
pa := &v1alpha1.PodAutoscaler{
Spec: v1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no value is assigned
MaxReplicas: 5,
MetricsSources: []autoscalingv1alpha1.MetricSource{
MetricsSources: []v1alpha1.MetricSource{
{
Endpoint: "service1.example.com",
Path: "/api/metrics/cpu",
Expand All @@ -135,6 +138,7 @@ func TestKpaUpdateContext(t *testing.T) {
"kpa.autoscaling.aibrix.ai/activation-scale": "3",
"kpa.autoscaling.aibrix.ai/panic-threshold": "2.5",
"kpa.autoscaling.aibrix.ai/stable-window": "60s",
"kpa.autoscaling.aibrix.ai/panic-window": "50s",
"kpa.autoscaling.aibrix.ai/scale-down-delay": "30s",
},
},
Expand Down Expand Up @@ -162,7 +166,191 @@ func TestKpaUpdateContext(t *testing.T) {
if kpaSpec.StableWindow != 60*time.Second {
t.Errorf("expected StableWindow = 60s, got %v", kpaSpec.StableWindow)
}
if kpaSpec.PanicWindow != 50*time.Second {
t.Errorf("expected PanicWindow = 50s, got %v", kpaSpec.PanicWindow)
}
if kpaSpec.ScaleDownDelay != 30*time.Second {
t.Errorf("expected ScaleDownDelay = 10s, got %v", kpaSpec.ScaleDownDelay)
}
}

// checkInPanic check AutoScaler's panic status is as expected
func checkInPanic(t *testing.T, expectIn bool, autoScaler *KpaAutoscaler) {
if expectIn && !autoScaler.InPanicMode() {
t.Fatalf("should be in panic mode")
}
if !expectIn && autoScaler.InPanicMode() {
t.Fatalf("shouldn't be in panic mode")
}
}

// TestKpaScale2 simulate from creating KPA scaler same as what `PodAutoscalerReconciler.updateMetricsForScale` do.
func TestKpaScale2(t *testing.T) {
klog.InitFlags(nil)
_ = flag.Set("v", "4")
_ = flag.Set("v", "2")

PANIC_WINDOW := 10 * time.Second
STABLE_WINDOW := 30 * time.Second
SCALE_DOWN_DELAY := 20 * time.Second
pa := &v1alpha1.PodAutoscaler{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test_ns",
Name: "test_llm_for_pa",
Labels: map[string]string{
"kpa.autoscaling.aibrix.ai/stable-window": STABLE_WINDOW.String(),
"kpa.autoscaling.aibrix.ai/panic-window": PANIC_WINDOW.String(),
"kpa.autoscaling.aibrix.ai/scale-down-delay": SCALE_DOWN_DELAY.String(),
},
},
Spec: v1alpha1.PodAutoscalerSpec{
ScaleTargetRef: corev1.ObjectReference{
Kind: "Deployment",
Name: "example-deployment",
},
MinReplicas: nil, // expecting nil as default since it's a pointer and no totalValue is assigned
MaxReplicas: 5,
MetricsSources: []v1alpha1.MetricSource{
{
MetricSourceType: v1alpha1.POD,
ProtocolType: v1alpha1.HTTP,
Path: "metrics",
Port: "8000",
TargetMetric: "ttot",
TargetValue: "50",
},
},
ScalingStrategy: "KPA",
},
}

readyPodCount := 5
now := time.Unix(int64(10000), 0)
autoScaler, err := NewKpaAutoscaler(readyPodCount, pa, now)
if err != nil {
t.Errorf("NewKpaAutoscaler() failed: %v", err)
}

kpaMetricsClient, ok := autoScaler.metricClient.(*metrics.KPAMetricsClient)
if !ok {
t.Errorf("autoscaler.metricClient is not of type *metrics.KPAMetricsClient")
return
}

ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
metricKey, _, err := metrics.NewNamespaceNameMetric(pa)
if err != nil {
t.Errorf("NewNamespaceNameMetric() failed: %v", err)
}

type TestData struct {
ts time.Time
totalValue float64
desiredPodCount int32
checkScalerAttr func()
}

//// add a time delta to trigger
//time.Sleep(1 * time.Second)

// The KPA scaling test scenario is as follows:
testDataList := []TestData{
// 1. KPA initially starts in panic mode.
{now.Add(0 * time.Second), 10.0, 5,
func() {
checkInPanic(t, true, autoScaler)
}},
// 2. Metrics remain at a low level, and no scaling down occurs due to panic mode.
{now.Add(10 * time.Second), 10.0, 5,
func() {
checkInPanic(t, true, autoScaler)
}},
{now.Add(20 * time.Second), 10.0, 5,
func() {
checkInPanic(t, true, autoScaler)
}},
// it's the right close boundary of panic window, scaler will exit panic mode at next update
{now.Add(STABLE_WINDOW), 10.0, 5,
func() {
checkInPanic(t, true, autoScaler)
if m, _ := autoScaler.delayWindow.Max(); m != 5 {
t.Fatalf("max delayWindow should be 5")
}
}},
// 3. After a sustained period of low metrics, KPA exits panic mode, but scaling down is still delayed because of the DelayWindow.
{now.Add(40 * time.Second), 10.0, 5,
func() {
checkInPanic(t, false, autoScaler)
if m, _ := autoScaler.delayWindow.Max(); m != 5 {
t.Fatalf("max delayWindow should be 5")
}
expectDelayWindow := "TimeWindow(granularity=1s, window=window(size=2, values=[{10030, 5.00}, {10040, 2.00}]))"
if s := autoScaler.delayWindow.String(); s != expectDelayWindow {
t.Fatalf("unexpected delayWindow: expected %s, got %s", expectDelayWindow, s)
}
}},
// 4. The max(DelayWindow) has reduce to low level, do scale down
{now.Add(STABLE_WINDOW + SCALE_DOWN_DELAY), 10.0, 2,
func() {
checkInPanic(t, false, autoScaler)
if m, _ := autoScaler.delayWindow.Max(); m != 2 {
t.Fatalf("max delayWindow should be 2")
}
}},
// 5. Metrics start to increase, and the PanicWindow suggests scaling up, but doesn't exceed the panic threshold.
// the stableWindow suggest remain replica to 2. KPA does not enter panic mode.
{now.Add(60 * time.Second), 150.0, 2,
func() {
checkInPanic(t, false, autoScaler)
}},
// 6. Metrics continue increasing. stableWindow suggests scaling up to 3, but still doesn't exceed the panic threshold,
// thus, not in panicWindow, but scale up
{now.Add(70 * time.Second), 150.0, 3,
func() {
checkInPanic(t, false, autoScaler)
}},
// 7. Metrics continue to rise, and the scaling rate > threshold, start panic
{now.Add(80 * time.Second), 300.0, 6,
func() {
checkInPanic(t, true, autoScaler)
if autoScaler.panicTime != now.Add(80*time.Second) {
t.Fatalf("Expected panicTime to be %v, got %v", now.Add(80*time.Second), autoScaler.panicTime)
}
}},
// 8. Metrics continue increasing, update panic time
{now.Add(90 * time.Second), 600.0, 12,
func() {
checkInPanic(t, true, autoScaler)
if autoScaler.panicTime != now.Add(90*time.Second) {
t.Fatalf("Expected panicTime to be %v, got %v", now.Add(90*time.Second), autoScaler.panicTime)
}
}},
// 9. Metrics stabilize and stop increasing, KPA remains in panic mode but no longer updates the panicTime.
{now.Add(100 * time.Second), 300.0, 12,
func() {
checkInPanic(t, true, autoScaler)
if autoScaler.panicTime != now.Add(90*time.Second) {
t.Fatalf("Expected panicTime to be %v, got %v", now.Add(90*time.Second), autoScaler.panicTime)
}
}},
}
for _, testData := range testDataList {
t.Logf("--- test ts=%v: totalValue=%.2f expect=%d", testData.ts.Unix(), testData.totalValue, testData.desiredPodCount)
err := kpaMetricsClient.UpdateMetricIntoWindow(testData.ts, testData.totalValue)
if err != nil {
t.Fatalf("failed to update metric: %v", err)
}

result := autoScaler.Scale(readyPodCount, metricKey, testData.ts)
testData.checkScalerAttr()

if result.DesiredPodCount != testData.desiredPodCount {
t.Fatalf("expected DesiredPodCount = %d, got %d", testData.desiredPodCount, result.DesiredPodCount)
}
// update the up-to-date pod count
readyPodCount = int(result.DesiredPodCount)
t.Logf("scale pod count to %d", readyPodCount)
}

}
Loading

0 comments on commit ab1c382

Please sign in to comment.