Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit f708be2

Browse files
committed
aggregation: always treat points inside timer interval as one bucket
1 parent b47e85f commit f708be2

File tree

2 files changed

+114
-25
lines changed

2 files changed

+114
-25
lines changed

src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumer
2121

2222
protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
2323
{
24+
DateTime now = DateTime.UtcNow;
25+
DateTime bucketThreshold = now - _interval;
26+
2427
var grouped = batch.GroupBy(x => new GroupingKey(
25-
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0,
28+
DetermineBucket(x.UtcTimestamp, bucketThreshold, now),
2629
DetermineKind(x),
2730
x.Measurement,
2831
x.Tags
@@ -33,6 +36,45 @@ protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
3336
_parent.Emit(aggregated);
3437
}
3538

39+
private long DetermineBucket(DateTime? timestamp, DateTime bucketThreshold, DateTime now)
40+
{
41+
if (!timestamp.HasValue)
42+
{
43+
return 0;
44+
}
45+
46+
DateTime value = timestamp.Value;
47+
48+
if (value >= bucketThreshold && value <= now)
49+
{
50+
// point was in timer interval
51+
return bucketThreshold.Ticks;
52+
}
53+
else
54+
{
55+
// point was before or after timer interval, round it to multiple of interval
56+
return (value.Ticks / _interval.Ticks) * _interval.Ticks;
57+
}
58+
}
59+
60+
static MeasurementKind DetermineKind(PointData x)
61+
{
62+
if (x.Fields.Count != 1) return MeasurementKind.Other;
63+
64+
if (x.Fields.TryGetValue("count", out var count) && count is long)
65+
{
66+
return MeasurementKind.Increment;
67+
}
68+
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
69+
{
70+
return MeasurementKind.Time;
71+
}
72+
else
73+
{
74+
return MeasurementKind.Other;
75+
}
76+
}
77+
3678
IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
3779
{
3880
GroupingKey key = group.Key;
@@ -69,25 +111,7 @@ IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
69111

70112
private DateTime AverageTime(GroupingKey key)
71113
{
72-
return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc);
73-
}
74-
75-
static MeasurementKind DetermineKind(PointData x)
76-
{
77-
if (x.Fields.Count != 1) return MeasurementKind.Other;
78-
79-
if (x.Fields.TryGetValue("count", out var count) && count is long)
80-
{
81-
return MeasurementKind.Increment;
82-
}
83-
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
84-
{
85-
return MeasurementKind.Time;
86-
}
87-
else
88-
{
89-
return MeasurementKind.Other;
90-
}
114+
return new DateTime(key.Bucket + _interval.Ticks / 2, DateTimeKind.Utc);
91115
}
92116
}
93117
}

test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,60 @@ namespace InfluxDB.LineProtocol.Tests.Collector
1010
{
1111
public class AggregationTests
1212
{
13+
[Fact]
14+
public async Task PointsAreCorrectlyGrouped()
15+
{
16+
var written = new TaskCompletionSource<object>();
17+
var list = new List<PointData>();
18+
19+
var start = DateTime.UtcNow;
20+
21+
var collector = new CollectorConfiguration()
22+
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
23+
.Aggregate.SumIncrements()
24+
.WriteTo.Emitter(pts =>
25+
{
26+
list.AddRange(pts);
27+
written.SetResult(0);
28+
})
29+
.CreateCollector();
30+
31+
collector.Write("foo",
32+
new Dictionary<string, object> { { "count", 1L } },
33+
new Dictionary<string, string> { { "tag1", "a" } },
34+
start
35+
);
36+
collector.Write("foo",
37+
new Dictionary<string, object> { { "count", 1L } },
38+
new Dictionary<string, string> { { "tag1", "a" } },
39+
start + TimeSpan.FromMilliseconds(200)
40+
);
41+
collector.Write("foo",
42+
new Dictionary<string, object> { { "count", 1L } },
43+
new Dictionary<string, string> { { "tag1", "a" } },
44+
start + TimeSpan.FromMilliseconds(400)
45+
);
46+
47+
await written.Task;
48+
49+
Assert.Equal(1, list.Count);
50+
Assert.Equal(3L, list[0].Fields["count"]);
51+
}
52+
1353
[Fact]
1454
public async Task IncrementsCanBeSummed()
1555
{
56+
var written = new TaskCompletionSource<object>();
1657
var list = new List<PointData>();
1758

1859
IPointEmitter collector = new CollectorConfiguration()
1960
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
2061
.Aggregate.SumIncrements()
21-
.WriteTo.Emitter(pts => list.AddRange(pts))
62+
.WriteTo.Emitter(pts =>
63+
{
64+
list.AddRange(pts);
65+
written.SetResult(0);
66+
})
2267
.CreateCollector();
2368

2469
collector.Emit(new[]
@@ -49,12 +94,17 @@ public async Task IncrementsCanBeSummed()
4994
[Fact]
5095
public async Task TimesCanBeAveraged()
5196
{
97+
var written = new TaskCompletionSource<object>();
5298
var list = new List<PointData>();
5399

54100
IPointEmitter collector = new CollectorConfiguration()
55101
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(400))
56102
.Aggregate.AggregateTimes(Enumerable.Average)
57-
.WriteTo.Emitter(pts => list.AddRange(pts))
103+
.WriteTo.Emitter(pts =>
104+
{
105+
list.AddRange(pts);
106+
written.SetResult(0);
107+
})
58108
.CreateCollector();
59109

60110
collector.Emit(new[]
@@ -85,12 +135,17 @@ public async Task TimesCanBeAveraged()
85135
[Fact]
86136
public async Task DifferentTagsArentAggregated()
87137
{
138+
var written = new TaskCompletionSource<object>();
88139
var list = new List<PointData>();
89140

90141
IPointEmitter collector = new CollectorConfiguration()
91142
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
92143
.Aggregate.SumIncrements()
93-
.WriteTo.Emitter(pts => list.AddRange(pts))
144+
.WriteTo.Emitter(pts =>
145+
{
146+
list.AddRange(pts);
147+
written.SetResult(0);
148+
})
94149
.CreateCollector();
95150

96151
collector.Emit(new[]
@@ -119,12 +174,17 @@ public async Task DifferentTagsArentAggregated()
119174
[Fact]
120175
public async Task DifferentMeasurementsArentAggregated()
121176
{
177+
var written = new TaskCompletionSource<object>();
122178
var list = new List<PointData>();
123179

124180
IPointEmitter collector = new CollectorConfiguration()
125181
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
126182
.Aggregate.SumIncrements()
127-
.WriteTo.Emitter(pts => list.AddRange(pts))
183+
.WriteTo.Emitter(pts =>
184+
{
185+
list.AddRange(pts);
186+
written.SetResult(0);
187+
})
128188
.CreateCollector();
129189

130190
collector.Emit(new[]
@@ -153,12 +213,17 @@ public async Task DifferentMeasurementsArentAggregated()
153213
[Fact]
154214
public async Task DifferentTimeSpansArentAggregated()
155215
{
216+
var written = new TaskCompletionSource<object>();
156217
var list = new List<PointData>();
157218

158219
IPointEmitter collector = new CollectorConfiguration()
159220
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
160221
.Aggregate.SumIncrements()
161-
.WriteTo.Emitter(pts => list.AddRange(pts))
222+
.WriteTo.Emitter(pts =>
223+
{
224+
list.AddRange(pts);
225+
written.SetResult(0);
226+
})
162227
.CreateCollector();
163228

164229
collector.Emit(new[]

0 commit comments

Comments
 (0)