Skip to content

Commit

Permalink
Add kpa scale algorithm implementation (#87)
Browse files Browse the repository at this point in the history
* add a demo KPA scale policy implementation.

* add some docstring

* refact kpa code, move temporary data structure outside kpa.go

* refact KPA, extract interface

1. Added window and timeWindow class.
2. Abstracted the Autoscaler class to encapsulate configuration and state information for scalers. All scalers, such as KpaScaler, now include an Autoscaler attribute.
3. Introduced the Scaler interface, which includes the Scale method. All scalers, including KpaScaler, are required to implement the Scaler.Scale method.

* fix go imports
  • Loading branch information
kr11 authored Aug 20, 2024
1 parent af45221 commit 197ded0
Show file tree
Hide file tree
Showing 5 changed files with 605 additions and 0 deletions.
122 changes: 122 additions & 0 deletions pkg/controller/podautoscaler/aggregation/window.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aggregation

import (
"math"
"time"
)

/*
*
Referenced the knative implementation: pkg/autoscaler/aggregation/max/window.go,
but did not use the Ascending Minima Algorithm as we may need other aggregation functions beyond Max.
*/
type entry struct {
value int32
index int
}

// window is a sliding window that keeps track of recent {size} values.
type window struct {
valueList []entry
first, length int
}

// newWindow creates a new window of specified size.
func newWindow(size int) *window {
return &window{
valueList: make([]entry, size),
}
}

func (w *window) Size() int {
return len(w.valueList)
}

func (w *window) index(i int) int {
return i % w.Size()
}

// Record updates the window with a new value and index.
// It also removes all entries that are too old (index too small compared to the new index).
func (w *window) Record(value int32, index int) {
// Remove elements that are outside the sliding window range.
for w.length > 0 && w.valueList[w.first].index <= index-w.Size() {
w.first = w.index(w.first + 1)
w.length--
}

// Add the new value to the valueList.
if w.length < w.Size() { // Ensure we do not exceed the buffer
w.valueList[w.index(w.first+w.length)] = entry{value: value, index: index}
w.length++
}
}

// Max returns the maximum value in the current window.
func (w *window) Max() int32 {
if w.length > 0 {
maxValue := w.valueList[w.first].value
for i := 1; i < w.length; i++ {
valueIndex := w.index(w.first + i)
if w.valueList[valueIndex].value > maxValue {
maxValue = w.valueList[valueIndex].value
}
}
return maxValue
}
return -1 // return a default value if no entries exist
}

// Min returns the minimum value in the current window.
func (w *window) Min() int32 {
if w.length > 0 {
minValue := w.valueList[w.first].value
for i := 1; i < w.length; i++ {
valueIndex := w.index(w.first + i)
if w.valueList[valueIndex].value < minValue {
minValue = w.valueList[valueIndex].value
}
}
return minValue
}
return -1 // return a default value if no entries exist
}

type TimeWindow struct {
window *window
granularity time.Duration
}

func NewTimeWindow(duration, granularity time.Duration) *TimeWindow {
buckets := int(math.Ceil(float64(duration) / float64(granularity)))
return &TimeWindow{window: newWindow(buckets), granularity: granularity}
}

func (t *TimeWindow) Record(now time.Time, value int32) {
index := int(now.Unix()) / int(t.granularity.Seconds())
t.window.Record(value, index)
}

func (t *TimeWindow) Max() int32 {
return t.window.Max()
}

func (t *TimeWindow) Min() int32 {
return t.window.Min()
}
95 changes: 95 additions & 0 deletions pkg/controller/podautoscaler/aggregation/window_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aggregation

import (
"testing"
"time"
)

func TestWindow(t *testing.T) {
win := newWindow(5)
values := []int32{0, 1, 2, 3, 4}
indices := []int{0, 1, 2, 3, 4}

for i, v := range values {
win.Record(v, indices[i])
}

if max := win.Max(); max != 4 {
t.Errorf("Expected max 4, got %d", max)
}
if min := win.Min(); min != 0 {
t.Errorf("Expected min 4, got %d", min)
}

// Test sliding out old values
win.Record(6, 6) // This should slide out the 1
if min := win.Min(); min != 2 {
t.Errorf("Expected min 2 after sliding out, got %d", min)
}

win.Record(8, 8) // This should slide out the 1
if min := win.Min(); min != 4 {
t.Errorf("Expected min 4 after sliding out, got %d", min)
}
}

func TestTimeWindow(t *testing.T) {
tw := NewTimeWindow(5*time.Second, 1*time.Second)
//now := time.Now()
now := time.Time{}
values := []int32{0, 1, 2, 3, 4}

for i, v := range values {
tw.Record(now.Add(time.Duration(i)*time.Second), v)
}

if max := tw.Max(); max != 4 {
t.Errorf("Expected max 4, got %d", max)
}

// test sliding out old values
tw.Record(now.Add(time.Duration(8)*time.Second), 8)

if min := tw.Min(); min != 4 {
t.Errorf("Entry 8, the valid min bound is 8-5+1=4. Expected min 4, got %d", min)
}

// test case of circular array
tw.Record(now.Add(time.Duration(9)*time.Second), 9)

if min := tw.Min(); min != 8 {
t.Errorf("Entry 8, the valid min bound is 9-5+1=5. Expected min 8, got %d", min)
}
}

func TestDelayWindow(t *testing.T) {
delayWindow := NewTimeWindow(10*time.Second, 1*time.Second)

now := time.Now()
desiredPodCount := int32(10)
delayWindow.Record(now, desiredPodCount)

// Simulate a short delay and a new decision within the same interval
delayWindow.Record(now.Add(500*time.Millisecond), desiredPodCount-1)
delayedPodCount := delayWindow.Max()

if delayedPodCount != desiredPodCount {
t.Errorf("Expected delayed count to match original, got %d", delayedPodCount)
}
}
128 changes: 128 additions & 0 deletions pkg/controller/podautoscaler/scaler/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2024 The Aibrix Team.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kpa

import (
"sync"
"time"
)

/**
This implementation is inspired by the scaling solutions provided by Knative.
Our implementation specifically mimics and adapts the autoscaling functionality found in:
- autoscaler: pkg/autoscaler/scaling/autoscaler.go
- Scaler(interface): pkg/autoscaler/scaling/autoscaler.go
- DeciderSpec: pkg/autoscaler/scaling/multiscaler.go
- ScaleResult: pkg/autoscaler/scaling/multiscaler.go
*/

// Autoscaler represents an instance of the autoscaling engine.
// It encapsulates all the necessary data and state needed for scaling decisions.
// Refer to: KpaScaler
type Autoscaler struct {
// specMux guards the current DeciderSpec.
specMux sync.RWMutex
podCounter int
deciderSpec *DeciderSpec
Status DeciderStatus
}

// Scaler is an interface that defines the scaling operations.
// Any autoscaler implementation, such as KpaScaler (Kubernetes Pod Autoscaler),
// needs to implement this interface to respond to scale events.
type Scaler interface {
// Scale calculates the necessary scaling action based on the observed metrics
// and the current time. This method is the core of the autoscaling logic.
//
// Parameters:
// observedStableValue - the metric value (e.g., CPU utilization) averaged over a stable period.
// observedPanicValue - the metric value observed during a short, recent period which may indicate a spike or drop.
// now - the current time, used to determine if scaling actions are needed based on time-based rules or delays.
//
// Returns:
// ScaleResult which contains the recommended number of pods to scale up or down to.
//
// Refer to: KpaScaler.Scale Implementation
Scale(observedStableValue float64, observedPanicValue float64, now time.Time) ScaleResult
}

// DeciderSpec defines parameters for scaling decisions.
type DeciderSpec struct {
// Maximum rate at which to scale up
MaxScaleUpRate float64
// Maximum rate at which to scale down
MaxScaleDownRate float64
// The metric used for scaling, i.e. CPU, Memory, QPS.
ScalingMetric string
// The value of scaling metric per pod that we target to maintain.
TargetValue float64
// The total value of scaling metric that a pod can maintain.
TotalValue float64
// The burst capacity that user wants to maintain without queuing at the POD level.
// Note, that queueing still might happen due to the non-ideal load balancing.
TargetBurstCapacity float64
// ActivationScale is the minimum, non-zero value that a service should scale to.
// For example, if ActivationScale = 2, when a service scaled from zero it would
// scale up two replicas in this case. In essence, this allows one to set both a
// min-scale value while also preserving the ability to scale to zero.
// ActivationScale must be >= 2.
ActivationScale int32

// TODO: Note that the following attributes are specific to Knative; but we retain them here temporarily.
// PanicThreshold is the threshold at which panic mode is entered. It represents
// a factor of the currently observed load over the panic window over the ready
// pods. I.e. if this is 2, panic mode will be entered if the observed metric
// is twice as high as the current population can handle.
PanicThreshold float64
// StableWindow is needed to determine when to exit panic mode.
StableWindow time.Duration
// ScaleDownDelay is the time that must pass at reduced concurrency before a
// scale-down decision is applied.
ScaleDownDelay time.Duration
}

// DeciderStatus is the current scale recommendation.
type DeciderStatus struct {
// DesiredScale is the target number of instances that autoscaler
// this revision needs.
DesiredScale int32

// TODO: ExcessBurstCapacity might be a general attribute since it describes
// how much capacity users want to keep for preparing for burst traffic.

// ExcessBurstCapacity is the difference between spare capacity
// (how much more load the pods in the revision deployment can take before being
// overloaded) and the configured target burst capacity.
// If this number is negative: Activator will be threaded in
// the request path by the PodAutoscaler controller.
ExcessBurstCapacity int32
}

// ScaleResult contains the results of a scaling decision.
type ScaleResult struct {
// DesiredPodCount is the number of pods Autoscaler suggests for the revision.
DesiredPodCount int32
// ExcessBurstCapacity is computed headroom of the revision taking into
// the account target burst capacity.
ExcessBurstCapacity int32
// ScaleValid specifies whether this scale result is valid, i.e. whether
// Autoscaler had all the necessary information to compute a suggestion.
ScaleValid bool
}
Loading

0 comments on commit 197ded0

Please sign in to comment.