-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbudget.go
236 lines (191 loc) · 5.36 KB
/
budget.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
package retry
import (
"fmt"
"math"
"sync"
"time"
)
// Budget implements a retry budget, i.e. a limit for retries.
// Limiting the amount of retries sent to a service helps to mitigate cascading failures.
//
// To add a retry budget for a specific service or backend, declare a Budget
// variable that is shared by all Do() calls. See the example for a demonstration.
//
// Budget calculates the rate of initial calls and the rate of retries over a
// moving one minute window. If the rate of retries exceeds Budget.Rate and the
// ratio of retries exceeds Budget.Ratio, then retries are dropped.
// The Do() function returns ErrExhausted in this case.
//
// Implements the Option interface.
type Budget struct {
// Rate is the minimum rate of retries (in calls per second).
// If fewer retries are attempted than this rate, retries are never throttled.
Rate float64
// Ratio is the maximum ratio of retries.
// When used as an option to Do(), it's the ratio of retries to initial
// calls. In that case ratio is a number in the [0.0, Attempts()]
// range. The initial request is never dropped.
// When used as part of BudgetHandler, it's the ratio of retries to
// total requests. In that case ratio is a number in the [0.0, 1.0]
// range.
Ratio float64
mu sync.Mutex
initialCalls *movingRate
retriedCalls *movingRate
}
func (b *Budget) apply(opts *internalOptions) {
opts.budget = b
}
// sendOK checks on the client side if a request should be sent. The first
// (non-retried) call is always permitted, blocked retries are not accounted.
func (b *Budget) sendOK(isRetry bool) bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
if b.retriedCalls == nil {
b.retriedCalls = newMovingRate()
}
if b.initialCalls == nil {
b.initialCalls = newMovingRate()
}
t := time.Now()
if !isRetry {
b.initialCalls.Add(t, 1)
return true
}
initialRate := b.initialCalls.Rate(t)
retriedRate := b.retriedCalls.Rate(t)
if initialRate > b.Rate &&
// not accounted
retriedRate/initialRate > b.Ratio {
return false
}
b.retriedCalls.Add(t, 1)
return true
}
// overload checks on the server side if the cluster appears to be in overload.
// May return true even for initial (non-retried) requests and accounts all
// requests, even when overload is signaled.
func (b *Budget) overload(isRetry bool) bool {
if b == nil {
return true
}
b.mu.Lock()
defer b.mu.Unlock()
if b.retriedCalls == nil {
b.retriedCalls = newMovingRate()
}
if b.initialCalls == nil {
b.initialCalls = newMovingRate()
}
t := time.Now()
if isRetry {
b.retriedCalls.Add(t, 1)
} else {
b.initialCalls.Add(t, 1)
}
initialRate := b.initialCalls.Rate(t)
retriedRate := b.retriedCalls.Rate(t)
// TODO(octo): this calculates the ratio as retried/total, while
// sendOK() uses retried/initial. That's confusing.
totalRate := initialRate + retriedRate
return totalRate > b.Rate && retriedRate/totalRate > b.Ratio
}
func timeRoundDown(t time.Time, d time.Duration) time.Time {
rt := t.Round(d)
if rt.After(t) {
rt = rt.Add(-d)
}
return rt
}
type movingRate struct {
BucketLength time.Duration
BucketNum int
counts []int
lastUpdate time.Time
}
func newMovingRate() *movingRate {
return &movingRate{
BucketLength: time.Second,
BucketNum: 60,
}
}
func (mr *movingRate) count() float64 {
// history is not yet fully initialized
if len(mr.counts) <= mr.BucketNum {
var s float64
for _, c := range mr.counts {
s += float64(c)
}
return s
}
oldestFraction := 1.0 -
float64(mr.lastUpdate.Sub(timeRoundDown(mr.lastUpdate, mr.BucketLength)))/
float64(mr.BucketLength)
s := oldestFraction * float64(mr.counts[0])
for i := 1; i < len(mr.counts); i++ {
s += float64(mr.counts[i])
}
return s
}
func (mr *movingRate) second() float64 {
if len(mr.counts) == 0 {
return 0.0
}
// history is not yet fully initialized
if len(mr.counts) <= mr.BucketNum {
d := time.Duration(len(mr.counts)-1) * mr.BucketLength
d += mr.lastUpdate.Sub(timeRoundDown(mr.lastUpdate, mr.BucketLength))
return d.Seconds()
}
d := time.Duration(mr.BucketNum) * mr.BucketLength
return d.Seconds()
}
func (mr *movingRate) shift(n int) {
if n > mr.BucketNum+1 {
n = mr.BucketNum + 1
}
zero := make([]int, n)
mr.counts = append(mr.counts, zero...)
// we actually keep BucketNum+1 buckets -- the newest and oldest
// buckets are partially evaluated so the window length stays constant.
if del := len(mr.counts) - (mr.BucketNum + 1); del > 0 {
mr.counts = mr.counts[del:]
}
mr.lastUpdate = timeRoundDown(mr.lastUpdate, mr.BucketLength).Add(time.Duration(n) * mr.BucketLength)
}
func (mr *movingRate) forward(t time.Time) {
defer func() {
mr.lastUpdate = t
}()
if mr.lastUpdate.IsZero() {
mr.counts = []int{0}
return
}
rt := timeRoundDown(t, mr.BucketLength)
if !rt.After(mr.lastUpdate) {
return
}
n := int(rt.Sub(timeRoundDown(mr.lastUpdate, mr.BucketLength)) / mr.BucketLength)
if n <= 0 {
panic(fmt.Sprintf("assertion failure: n = %d, want >0; rt = %v, mr.lastUpdate = %v, mr.BucketLength = %v",
n, rt, mr.lastUpdate, mr.BucketLength))
}
mr.shift(n)
}
func (mr *movingRate) Add(t time.Time, n int) {
if t.Before(mr.lastUpdate) {
return
}
mr.forward(t)
mr.counts[len(mr.counts)-1] += n
}
func (mr *movingRate) Rate(t time.Time) float64 {
if t.Before(mr.lastUpdate) {
return math.NaN()
}
mr.forward(t)
return mr.count() / mr.second()
}