-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathtime.go
104 lines (91 loc) · 3.31 KB
/
time.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package rolling
import (
"sync"
"time"
)
// TimePolicy is a window Accumulator implementation that uses some
// duration of time to determine the content of the window.
type TimePolicy struct {
bucketSize time.Duration
bucketSizeNano int64
numberOfBuckets int
numberOfBuckets64 int64
window [][]float64
lastWindowOffset int
lastWindowTime int64
lock *sync.Mutex
}
// NewTimePolicy manages a window with rolling time duratinos.
// The given duration will be used to bucket data within the window. If data
// points are received entire windows aparts then the window will only contain
// a single data point. If one or more durations of the window are missed then
// they are zeroed out to keep the window consistent.
func NewTimePolicy(window Window, bucketDuration time.Duration) *TimePolicy {
return &TimePolicy{
bucketSize: bucketDuration,
bucketSizeNano: bucketDuration.Nanoseconds(),
numberOfBuckets: len(window),
numberOfBuckets64: int64(len(window)),
window: window,
lock: &sync.Mutex{},
}
}
func (w *TimePolicy) resetWindow() {
for offset := range w.window {
w.window[offset] = w.window[offset][:0]
}
}
func (w *TimePolicy) resetBuckets(windowOffset int) {
var distance = windowOffset - w.lastWindowOffset
// If the distance between current and last is negative then we've wrapped
// around the ring. Recalculate the distance.
if distance < 0 {
distance = (w.numberOfBuckets - w.lastWindowOffset) + windowOffset
}
for counter := 1; counter < distance; counter = counter + 1 {
var offset = (counter + w.lastWindowOffset) % w.numberOfBuckets
w.window[offset] = w.window[offset][:0]
}
}
func (w *TimePolicy) keepConsistent(adjustedTime int64, windowOffset int) {
// If we've waiting longer than a full window for data then we need to clear
// the internal state completely.
if adjustedTime-w.lastWindowTime > w.numberOfBuckets64 {
w.resetWindow()
}
// When one or more buckets are missed we need to zero them out.
if adjustedTime != w.lastWindowTime && adjustedTime-w.lastWindowTime < w.numberOfBuckets64 {
w.resetBuckets(windowOffset)
}
}
func (w *TimePolicy) selectBucket(currentTime time.Time) (int64, int) {
var adjustedTime = currentTime.UnixNano() / w.bucketSizeNano
var windowOffset = int(adjustedTime % w.numberOfBuckets64)
return adjustedTime, windowOffset
}
// AppendWithTimestamp same as Append but with timestamp as parameter
func (w *TimePolicy) AppendWithTimestamp(value float64, timestamp time.Time) {
w.lock.Lock()
defer w.lock.Unlock()
var adjustedTime, windowOffset = w.selectBucket(timestamp)
w.keepConsistent(adjustedTime, windowOffset)
if w.lastWindowOffset != windowOffset {
w.window[windowOffset] = []float64{value}
} else {
w.window[windowOffset] = append(w.window[windowOffset], value)
}
w.lastWindowTime = adjustedTime
w.lastWindowOffset = windowOffset
}
// Append a value to the window using a time bucketing strategy.
func (w *TimePolicy) Append(value float64) {
w.AppendWithTimestamp(value, time.Now())
}
// Reduce the window to a single value using a reduction function.
func (w *TimePolicy) Reduce(f func(Window) float64) float64 {
w.lock.Lock()
defer w.lock.Unlock()
var adjustedTime, windowOffset = w.selectBucket(time.Now())
w.keepConsistent(adjustedTime, windowOffset)
return f(w.window)
}