-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathreduce_test.go
281 lines (252 loc) · 7.42 KB
/
reduce_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package rolling
import (
"fmt"
"testing"
)
// https://gist.github.com/cevaris/bc331cbe970b03816c6b
var epsilon = 0.00000001
func floatEquals(a float64, b float64) bool {
return (a-b) < epsilon && (b-a) < epsilon
}
var largeEpsilon = 0.001
func floatMostlyEquals(a float64, b float64) bool {
return (a-b) < largeEpsilon && (b-a) < largeEpsilon
}
func TestCount(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var result = p.Reduce(Count)
var expected = 100.0
if !floatEquals(result, expected) {
t.Fatalf("count calculated incorrectly: %f versus %f", expected, result)
}
}
func TestCountPreallocatedWindow(t *testing.T) {
var numberOfPoints = 100
var w = NewPreallocatedWindow(numberOfPoints, 100)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var result = p.Reduce(Count)
var expected = 100.0
if !floatEquals(result, expected) {
t.Fatalf("count with prealloc window calculated incorrectly: %f versus %f", expected, result)
}
}
func TestSum(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var result = p.Reduce(Sum)
var expected = 5050.0
if !floatEquals(result, expected) {
t.Fatalf("avg calculated incorrectly: %f versus %f", expected, result)
}
}
func TestAvg(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var result = p.Reduce(Avg)
var expected = 50.5
if !floatEquals(result, expected) {
t.Fatalf("avg calculated incorrectly: %f versus %f", expected, result)
}
}
func TestMax(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(100.0 - float64(x))
}
var result = p.Reduce(Max)
var expected = 99.0
if !floatEquals(result, expected) {
t.Fatalf("max calculated incorrectly: %f versus %f", expected, result)
}
}
func TestMin(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var result = p.Reduce(Min)
var expected = 1.0
if !floatEquals(result, expected) {
t.Fatalf("Min calculated incorrectly: %f versus %f", expected, result)
}
}
func TestPercentileAggregateInterpolateWhenEmpty(t *testing.T) {
var numberOfPoints = 0
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
var perc = 99.9
var a = Percentile(perc)
var result = p.Reduce(a)
if !floatEquals(result, 0) {
t.Fatalf("percentile should be zero but got %f", result)
}
}
func TestPercentileAggregateInterpolateWhenInsufficientData(t *testing.T) {
var numberOfPoints = 100
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var perc = 99.9
var a = Percentile(perc)
var result = p.Reduce(a)
// When there are insufficient values to satisfy the precision then the
// percentile algorithm degenerates to a max function. In this case, we need
// 1000 values in order to select a 99.9 but only have 100. 100 is also the
// maximum value and will be selected as k and k+1 in the linear
// interpolation.
var expected = 100.0
if !floatEquals(result, expected) {
t.Fatalf("%f percentile calculated incorrectly: %f versus %f", perc, expected, result)
}
}
func TestPercentileAggregateInterpolateWhenSufficientData(t *testing.T) {
var numberOfPoints = 1000
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var perc = 99.9
var a = Percentile(perc)
var result = p.Reduce(a)
var expected = 999.5
if !floatEquals(result, expected) {
t.Fatalf("%f percentile calculated incorrectly: %f versus %f", perc, expected, result)
}
}
func TestFastPercentileAggregateInterpolateWhenEmpty(t *testing.T) {
var numberOfPoints = 0
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
var perc = 99.9
var a = FastPercentile(perc)
var result = p.Reduce(a)
if !floatEquals(result, 0) {
t.Fatalf("fast percentile should be zero but got %f", result)
}
}
func TestFastPercentileAggregateInterpolateWhenSufficientData(t *testing.T) {
// Using a larger dataset so that the algorithm can converge on the
// correct value. Smaller datasets where the value might be interpolated
// linearly in the typical percentile calculation results in larger error
// in the result. This is acceptable so long as the estimated value approaches
// the correct value as more data are given.
var numberOfPoints = 10000
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
for x := 1; x <= numberOfPoints; x = x + 1 {
p.Append(float64(x))
}
var perc = 99.9
var a = FastPercentile(perc)
var result = p.Reduce(a)
var expected = 9990.0
if !floatEquals(result, expected) {
t.Fatalf("%f percentile calculated incorrectly: %f versus %f", perc, expected, result)
}
}
func TestFastPercentileAggregateUsingPSquaredDataSet(t *testing.T) {
var numberOfPoints = 20
var w = NewWindow(numberOfPoints)
var p = NewPointPolicy(w)
p.Append(0.02)
p.Append(0.15)
p.Append(0.74)
p.Append(0.83)
p.Append(3.39)
p.Append(22.37)
p.Append(10.15)
p.Append(15.43)
p.Append(38.62)
p.Append(15.92)
p.Append(34.60)
p.Append(10.28)
p.Append(1.47)
p.Append(0.40)
p.Append(0.05)
p.Append(11.39)
p.Append(0.27)
p.Append(0.42)
p.Append(0.09)
p.Append(11.37)
var perc = 50.0
var a = FastPercentile(perc)
var result = p.Reduce(a)
var expected = 4.44
if !floatMostlyEquals(result, expected) {
t.Fatalf("%f percentile calculated incorrectly: %f versus %f", perc, expected, result)
}
}
var aggregateResult float64
type policy interface {
Append(float64)
Reduce(func(Window) float64) float64
}
type aggregateBench struct {
inserts int
policy policy
aggregate func(Window) float64
aggregateName string
}
func BenchmarkAggregates(b *testing.B) {
var baseCases = []*aggregateBench{
{aggregate: Sum, aggregateName: "sum"},
{aggregate: Min, aggregateName: "min"},
{aggregate: Max, aggregateName: "max"},
{aggregate: Avg, aggregateName: "avg"},
{aggregate: Count, aggregateName: "count"},
{aggregate: Percentile(50.0), aggregateName: "p50"},
{aggregate: Percentile(99.9), aggregateName: "p99.9"},
{aggregate: FastPercentile(50.0), aggregateName: "fp50"},
{aggregate: FastPercentile(99.9), aggregateName: "fp99.9"},
}
var insertions = []int{1, 1000, 10000, 100000}
var benchCases = make([]*aggregateBench, 0, len(baseCases)*len(insertions))
for _, baseCase := range baseCases {
for _, inserts := range insertions {
var w = NewWindow(inserts)
var p = NewPointPolicy(w)
for x := 1; x <= inserts; x = x + 1 {
p.Append(float64(x))
}
benchCases = append(benchCases, &aggregateBench{
inserts: inserts,
aggregate: baseCase.aggregate,
aggregateName: baseCase.aggregateName,
policy: p,
})
}
}
for _, benchCase := range benchCases {
b.Run(fmt.Sprintf("Aggregate:%s-DataPoints:%d", benchCase.aggregateName, benchCase.inserts), func(bt *testing.B) {
var result float64
bt.ResetTimer()
for n := 0; n < bt.N; n = n + 1 {
result = benchCase.policy.Reduce(benchCase.aggregate)
}
aggregateResult = result
})
}
}