Skip to content
This repository was archived by the owner on Jul 18, 2023. It is now read-only.

Commit 1d44eff

Browse files
committed
aggregation: always treat points inside timer interval as one bucket
1 parent b47e85f commit 1d44eff

File tree

2 files changed

+113
-25
lines changed

2 files changed

+113
-25
lines changed

src/InfluxDB.Collector/Pipeline/Aggregate/AggregatePointEmitter.cs

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ public AggregatePointEmitter(TimeSpan timeSpan, bool sumIncrements, Func<IEnumer
2121

2222
protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
2323
{
24+
DateTime bucketThreshold = DateTime.UtcNow - _interval;
25+
2426
var grouped = batch.GroupBy(x => new GroupingKey(
25-
x.UtcTimestamp.HasValue ? x.UtcTimestamp.Value.Ticks / _interval.Ticks : 0,
27+
DetermineBucket(x.UtcTimestamp, bucketThreshold),
2628
DetermineKind(x),
2729
x.Measurement,
2830
x.Tags
@@ -33,6 +35,45 @@ protected override void HandleBatch(IReadOnlyCollection<PointData> batch)
3335
_parent.Emit(aggregated);
3436
}
3537

38+
private long DetermineBucket(DateTime? timestamp, DateTime bucketThreshold)
39+
{
40+
if (!timestamp.HasValue)
41+
{
42+
return 0;
43+
}
44+
45+
DateTime value = timestamp.Value;
46+
47+
if (value >= bucketThreshold)
48+
{
49+
// point was in timer interval
50+
return bucketThreshold.Ticks;
51+
}
52+
else
53+
{
54+
// point was before timer interval, round it to multiple of interval
55+
return (value.Ticks / _interval.Ticks) * _interval.Ticks;
56+
}
57+
}
58+
59+
static MeasurementKind DetermineKind(PointData x)
60+
{
61+
if (x.Fields.Count != 1) return MeasurementKind.Other;
62+
63+
if (x.Fields.TryGetValue("count", out var count) && count is long)
64+
{
65+
return MeasurementKind.Increment;
66+
}
67+
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
68+
{
69+
return MeasurementKind.Time;
70+
}
71+
else
72+
{
73+
return MeasurementKind.Other;
74+
}
75+
}
76+
3677
IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
3778
{
3879
GroupingKey key = group.Key;
@@ -69,25 +110,7 @@ IEnumerable<PointData> Aggregate(IGrouping<GroupingKey, PointData> group)
69110

70111
private DateTime AverageTime(GroupingKey key)
71112
{
72-
return new DateTime(key.Bucket * _interval.Ticks + _interval.Ticks / 2, DateTimeKind.Utc);
73-
}
74-
75-
static MeasurementKind DetermineKind(PointData x)
76-
{
77-
if (x.Fields.Count != 1) return MeasurementKind.Other;
78-
79-
if (x.Fields.TryGetValue("count", out var count) && count is long)
80-
{
81-
return MeasurementKind.Increment;
82-
}
83-
else if (x.Fields.TryGetValue("value", out var value) && value is TimeSpan)
84-
{
85-
return MeasurementKind.Time;
86-
}
87-
else
88-
{
89-
return MeasurementKind.Other;
90-
}
113+
return new DateTime(key.Bucket + _interval.Ticks / 2, DateTimeKind.Utc);
91114
}
92115
}
93116
}

test/InfluxDB.LineProtocol.Tests/Collector/AggregationTests.cs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,60 @@ namespace InfluxDB.LineProtocol.Tests.Collector
1010
{
1111
public class AggregationTests
1212
{
13+
[Fact]
14+
public async Task PointsAreCorrectlyGrouped()
15+
{
16+
var written = new TaskCompletionSource<object>();
17+
var list = new List<PointData>();
18+
19+
var start = DateTime.UtcNow;
20+
21+
var collector = new CollectorConfiguration()
22+
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
23+
.Aggregate.SumIncrements()
24+
.WriteTo.Emitter(pts =>
25+
{
26+
list.AddRange(pts);
27+
written.SetResult(0);
28+
})
29+
.CreateCollector();
30+
31+
collector.Write("foo",
32+
new Dictionary<string, object> { { "count", 1L } },
33+
new Dictionary<string, string> { { "tag1", "a" } },
34+
start
35+
);
36+
collector.Write("foo",
37+
new Dictionary<string, object> { { "count", 1L } },
38+
new Dictionary<string, string> { { "tag1", "a" } },
39+
start + TimeSpan.FromMilliseconds(200)
40+
);
41+
collector.Write("foo",
42+
new Dictionary<string, object> { { "count", 1L } },
43+
new Dictionary<string, string> { { "tag1", "a" } },
44+
start + TimeSpan.FromMilliseconds(400)
45+
);
46+
47+
await written.Task;
48+
49+
Assert.Equal(1, list.Count);
50+
Assert.Equal(3L, list[0].Fields["count"]);
51+
}
52+
1353
[Fact]
1454
public async Task IncrementsCanBeSummed()
1555
{
56+
var written = new TaskCompletionSource<object>();
1657
var list = new List<PointData>();
1758

1859
IPointEmitter collector = new CollectorConfiguration()
1960
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
2061
.Aggregate.SumIncrements()
21-
.WriteTo.Emitter(pts => list.AddRange(pts))
62+
.WriteTo.Emitter(pts =>
63+
{
64+
list.AddRange(pts);
65+
written.SetResult(0);
66+
})
2267
.CreateCollector();
2368

2469
collector.Emit(new[]
@@ -49,12 +94,17 @@ public async Task IncrementsCanBeSummed()
4994
[Fact]
5095
public async Task TimesCanBeAveraged()
5196
{
97+
var written = new TaskCompletionSource<object>();
5298
var list = new List<PointData>();
5399

54100
IPointEmitter collector = new CollectorConfiguration()
55101
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(400))
56102
.Aggregate.AggregateTimes(Enumerable.Average)
57-
.WriteTo.Emitter(pts => list.AddRange(pts))
103+
.WriteTo.Emitter(pts =>
104+
{
105+
list.AddRange(pts);
106+
written.SetResult(0);
107+
})
58108
.CreateCollector();
59109

60110
collector.Emit(new[]
@@ -85,12 +135,17 @@ public async Task TimesCanBeAveraged()
85135
[Fact]
86136
public async Task DifferentTagsArentAggregated()
87137
{
138+
var written = new TaskCompletionSource<object>();
88139
var list = new List<PointData>();
89140

90141
IPointEmitter collector = new CollectorConfiguration()
91142
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
92143
.Aggregate.SumIncrements()
93-
.WriteTo.Emitter(pts => list.AddRange(pts))
144+
.WriteTo.Emitter(pts =>
145+
{
146+
list.AddRange(pts);
147+
written.SetResult(0);
148+
})
94149
.CreateCollector();
95150

96151
collector.Emit(new[]
@@ -119,12 +174,17 @@ public async Task DifferentTagsArentAggregated()
119174
[Fact]
120175
public async Task DifferentMeasurementsArentAggregated()
121176
{
177+
var written = new TaskCompletionSource<object>();
122178
var list = new List<PointData>();
123179

124180
IPointEmitter collector = new CollectorConfiguration()
125181
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
126182
.Aggregate.SumIncrements()
127-
.WriteTo.Emitter(pts => list.AddRange(pts))
183+
.WriteTo.Emitter(pts =>
184+
{
185+
list.AddRange(pts);
186+
written.SetResult(0);
187+
})
128188
.CreateCollector();
129189

130190
collector.Emit(new[]
@@ -153,12 +213,17 @@ public async Task DifferentMeasurementsArentAggregated()
153213
[Fact]
154214
public async Task DifferentTimeSpansArentAggregated()
155215
{
216+
var written = new TaskCompletionSource<object>();
156217
var list = new List<PointData>();
157218

158219
IPointEmitter collector = new CollectorConfiguration()
159220
.Aggregate.AtInterval(TimeSpan.FromMilliseconds(500))
160221
.Aggregate.SumIncrements()
161-
.WriteTo.Emitter(pts => list.AddRange(pts))
222+
.WriteTo.Emitter(pts =>
223+
{
224+
list.AddRange(pts);
225+
written.SetResult(0);
226+
})
162227
.CreateCollector();
163228

164229
collector.Emit(new[]

0 commit comments

Comments
 (0)