This repository has been archived by the owner on Apr 18, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathloadshed.go
165 lines (146 loc) · 6.33 KB
/
loadshed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package loadshed
import (
"fmt"
"math/rand"
"time"
"github.com/asecurityteam/rolling"
)
// Doer is an interface representing load shedding interface with Do method
type Doer interface {
Do(func() error) error
}
// wrapper is an interface representing the loadshed feeders
type wrapper interface {
Wrap(func() error) func() error
}
// Option is a partial initializer for Loadshed
type Option func(*Loadshed) *Loadshed
const defaultHint = 1000
// PercentileLatency generates an option much like AverageLatency except the
// aggregation is computed as a percentile of the data recorded rather than an
// average. The percentile should be given as N%. For example, 95.0 or 99.0.
// Fractional percentiles, like 99.9, are also valid.
func PercentileLatency(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int, percentile float64) Option {
return func(m *Loadshed) *Loadshed {
if preallocHint < 1 {
preallocHint = defaultHint
}
var w = rolling.NewTimeWindow(bucketSize, buckets, preallocHint)
var a = rolling.NewLimitedRollup(requiredPoints, w, rolling.NewPercentageRollup(rolling.NewPercentileRollup(percentile, w, preallocHint, fmt.Sprintf("P%fLatency", percentile)), lower, upper, fmt.Sprintf("ChanceP%fLatency", percentile)))
m.aggregators = append(m.aggregators, a)
m.chain = append(m.chain, newLatencyTrackingDecorator(w).Wrap)
return m
}
}
// AverageLatency generates an option that adds average request latency within
// a rolling time window to the load shedding calculation. If the average value,
// in seconds, falls between lower and upper then a percentage of new requests
// will be rejected. The rolling window is configured by defining a bucket size
// and number of buckets. The preallocHint is an optimisation for keeping the
// number of alloc calls low. If the hint is zero then a default value is
// used.
func AverageLatency(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int) Option {
return func(m *Loadshed) *Loadshed {
if preallocHint < 1 {
preallocHint = defaultHint
}
var w = rolling.NewTimeWindow(bucketSize, buckets, preallocHint)
var a = rolling.NewLimitedRollup(requiredPoints, w, rolling.NewPercentageRollup(rolling.NewAverageRollup(w, "AverageLatency"), lower, upper, "ChanceAverageLatency"))
m.aggregators = append(m.aggregators, a)
m.chain = append(m.chain, newLatencyTrackingDecorator(w).Wrap)
return m
}
}
// ErrorRate generates an option that calculates the error rate percentile within
// a rolling time window to the load shedding calculation. If the error rate
// value falls between the lower and upper then a percentage of new requests
// will be rejected. The rolling window is configured by defining a bucket size
// and number of buckets. The preallocHint is an optimisation for keeping the
// number of alloc calls low. If the hint is zero then a default value is
// used.
func ErrorRate(lower float64, upper float64, bucketSize time.Duration, buckets int, preallocHint int, requiredPoints int) Option {
return func(m *Loadshed) *Loadshed {
if preallocHint < 1 {
preallocHint = defaultHint
}
var errWindow = rolling.NewTimeWindow(bucketSize, buckets, preallocHint) // track err req in past time duration window
var reqWindow = rolling.NewTimeWindow(bucketSize, buckets, preallocHint) // track req count in past time duration window
var w = newErrRate(errWindow, reqWindow, requiredPoints, "ErrorRate", preallocHint)
var a = rolling.NewPercentageRollup(w, lower, upper, "ChanceErrorRate")
m.aggregators = append(m.aggregators, a)
m.chain = append(m.chain, newErrorRateDecorator(errWindow, reqWindow).Wrap)
return m
}
}
// Concurrency generates an option that adds total concurrent requests to the
// load shedding calculation. Once the requests in flight reaches a value
// between lower and upper the Decorator will begin rejecting new requests
// based on the distance between the threshold values.
func Concurrency(lower int, upper int, wg *WaitGroup) Option {
return func(m *Loadshed) *Loadshed {
if wg == nil {
wg = NewWaitGroup()
}
m.aggregators = append(m.aggregators, rolling.NewPercentageRollup(wg, float64(lower), float64(upper), "ChanceConcurrency"))
m.chain = append(m.chain, newConcurrencyTrackingDecorator(wg).Wrap)
return m
}
}
// CPU generates an option that adds a rolling average of CPU usage to the
// load shedding calculation. It will configure the Decorator to reject a
// percentage of traffic once the average CPU usage is between lower and upper.
func CPU(lower float64, upper float64, pollingInterval time.Duration, windowSize int) Option {
return func(m *Loadshed) *Loadshed {
m.aggregators = append(m.aggregators, rolling.NewPercentageRollup(newAvgCPU(pollingInterval, windowSize), lower, upper, "ChanceCPU"))
return m
}
}
// Aggregator adds an arbitrary Aggregator to the evaluation for load shedding.
// The result of the aggregator will be interpreted as a percentage value
// between 0.0 and 1.0. This value will be used as the percentage of requests
// to reject.
func Aggregator(a rolling.Aggregator) Option {
return func(m *Loadshed) *Loadshed {
m.aggregators = append(m.aggregators, a)
return m
}
}
var zeroAggregator = rolling.NewSumRollup(rolling.NewPointWindow(1), "Zero")
// Loadshed is a struct containing all the aggregators that rejects a percentage of requests
// based on aggregation of system load data.
type Loadshed struct {
random func() float64
aggregators []rolling.Aggregator
chain []func(func() error) func() error
}
// Do function inputs a function which returns an error
func (l *Loadshed) Do(runfn func() error) error {
var result *rolling.Aggregate
for _, aggregator := range l.aggregators {
var r = aggregator.Aggregate()
if result == nil || r.Value > result.Value {
result = r
}
}
var chance = l.random()
if chance < result.Value {
return Rejected{Aggregate: result}
}
for _, c := range l.chain {
runfn = c(runfn)
}
return runfn()
}
// New generators a Loadshed struct that sheds load based on some
// definition of system load
func New(options ...Option) *Loadshed {
var r = rand.New(rand.NewSource(time.Now().UnixNano()))
var lo = &Loadshed{random: r.Float64}
for _, option := range options {
lo = option(lo)
}
if len(lo.aggregators) < 1 {
lo.aggregators = append(lo.aggregators, zeroAggregator)
}
return lo
}