forked from vulcand/oxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbucket.go
132 lines (117 loc) · 4.34 KB
/
bucket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package ratelimit
import (
"fmt"
"time"
"github.com/vulcand/oxy/internal/holsterv4/clock"
)
// UndefinedDelay default delay.
const UndefinedDelay = -1
// rate defines token bucket parameters.
type rate struct {
period time.Duration
average int64
burst int64
}
func (r *rate) String() string {
return fmt.Sprintf("rate(%v/%v, burst=%v)", r.average, r.period, r.burst)
}
// tokenBucket Implements token bucket algorithm (http://en.wikipedia.org/wiki/Token_bucket)
type tokenBucket struct {
// The time period controlled by the bucket in nanoseconds.
period time.Duration
// The number of nanoseconds that takes to add one more token to the total
// number of available tokens. It effectively caches the value that could
// have been otherwise deduced from refillRate.
timePerToken time.Duration
// The maximum number of tokens that can be accumulate in the bucket.
burst int64
// The number of tokens available for consumption at the moment. It can
// nether be larger then capacity.
availableTokens int64
// Tells when tokensAvailable was updated the last time.
lastRefresh clock.Time
// The number of tokens consumed the last time.
lastConsumed int64
}
// newTokenBucket crates a `tokenBucket` instance for the specified `Rate`.
func newTokenBucket(rate *rate) *tokenBucket {
period := rate.period
if period == 0 {
period = clock.Nanosecond
}
return &tokenBucket{
period: period,
timePerToken: time.Duration(int64(period) / rate.average),
burst: rate.burst,
lastRefresh: clock.Now().UTC(),
availableTokens: rate.burst,
}
}
// consume makes an attempt to consume the specified number of tokens from the
// bucket. If there are enough tokens available then `0, nil` is returned; if
// tokens to consume is larger than the burst size, then an error is returned
// and the delay is not defined; otherwise returned a none zero delay that tells
// how much time the caller needs to wait until the desired number of tokens
// will become available for consumption.
func (tb *tokenBucket) consume(tokens int64) (time.Duration, error) {
tb.updateAvailableTokens()
tb.lastConsumed = 0
if tokens > tb.burst {
return UndefinedDelay, fmt.Errorf("requested tokens larger than max tokens")
}
if tb.availableTokens < tokens {
return tb.timeTillAvailable(tokens), nil
}
tb.availableTokens -= tokens
tb.lastConsumed = tokens
return 0, nil
}
// rollback reverts effect of the most recent consumption. If the most recent
// `consume` resulted in an error or a burst overflow, and therefore did not
// modify the number of available tokens, then `rollback` won't do that either.
// It is safe to call this method multiple times, for the second and all
// following calls have no effect.
func (tb *tokenBucket) rollback() {
tb.availableTokens += tb.lastConsumed
tb.lastConsumed = 0
}
// update modifies `average` and `burst` fields of the token bucket according
// to the provided `Rate`.
func (tb *tokenBucket) update(rate *rate) error {
if rate.period != tb.period {
return fmt.Errorf("period mismatch: %v != %v", tb.period, rate.period)
}
tb.timePerToken = time.Duration(int64(tb.period) / rate.average)
tb.burst = rate.burst
if tb.availableTokens > rate.burst {
tb.availableTokens = rate.burst
}
return nil
}
// timeTillAvailable returns the number of nanoseconds that we need to
// wait until the specified number of tokens becomes available for consumption.
func (tb *tokenBucket) timeTillAvailable(tokens int64) time.Duration {
missingTokens := tokens - tb.availableTokens
return time.Duration(missingTokens) * tb.timePerToken
}
// updateAvailableTokens updates the number of tokens available for consumption.
// It is calculated based on the refill rate, the time passed since last refresh,
// and is limited by the bucket capacity.
func (tb *tokenBucket) updateAvailableTokens() {
now := clock.Now().UTC()
timePassed := now.Sub(tb.lastRefresh)
if tb.timePerToken == 0 {
return
}
tokens := tb.availableTokens + int64(timePassed/tb.timePerToken)
// If we haven't added any tokens that means that not enough time has passed,
// in this case do not adjust last refill checkpoint, otherwise it will be
// always moving in time in case of frequent requests that exceed the rate
if tokens != tb.availableTokens {
tb.lastRefresh = now
tb.availableTokens = tokens
}
if tb.availableTokens > tb.burst {
tb.availableTokens = tb.burst
}
}