Skip to content

Commit 26cb737

Browse files
pracuccigouthamve
andauthored
Backported PR 3192 (#3211)
Signed-off-by: Marco Pracucci <[email protected]> Co-authored-by: Goutham Veeramachaneni <[email protected]>
1 parent 3d40bbb commit 26cb737

File tree

6 files changed

+174
-11
lines changed

6 files changed

+174
-11
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@
8484
* [BUGFIX] Configs: prevent validation of templates to fail when using template functions. #3157
8585
* [BUGFIX] Configuring the S3 URL with an `@` but without username and password doesn't enable the AWS static credentials anymore. #3170
8686
* [BUGFIX] Limit errors on ranged queries (`api/v1/query_range`) no longer return a status code `500` but `422` instead. #3167
87+
* [BUGFIX] Handle hash-collisions in the query path. #3192
8788

8889
## 1.3.0 / 2020-08-21
8990

integration/querier_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,3 +619,104 @@ func TestQuerierWithChunksStorage(t *testing.T) {
619619
assertServiceMetricsPrefixes(t, Querier, querier)
620620
assertServiceMetricsPrefixes(t, TableManager, tableManager)
621621
}
622+
623+
func TestHashCollisionHandling(t *testing.T) {
624+
s, err := e2e.NewScenario(networkName)
625+
require.NoError(t, err)
626+
defer s.Close()
627+
628+
require.NoError(t, writeFileToSharedDir(s, cortexSchemaConfigFile, []byte(cortexSchemaConfigYaml)))
629+
flags := mergeFlags(ChunksStorageFlags, map[string]string{})
630+
631+
// Start dependencies.
632+
dynamo := e2edb.NewDynamoDB()
633+
634+
consul := e2edb.NewConsul()
635+
require.NoError(t, s.StartAndWaitReady(consul, dynamo))
636+
637+
tableManager := e2ecortex.NewTableManager("table-manager", ChunksStorageFlags, "")
638+
require.NoError(t, s.StartAndWaitReady(tableManager))
639+
640+
// Wait until the first table-manager sync has completed, so that we're
641+
// sure the tables have been created.
642+
require.NoError(t, tableManager.WaitSumMetrics(e2e.Greater(0), "cortex_table_manager_sync_success_timestamp_seconds"))
643+
644+
// Start Cortex components for the write path.
645+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
646+
ingester := e2ecortex.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags, "")
647+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
648+
649+
// Wait until the distributor has updated the ring.
650+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
651+
652+
// Push a series for each user to Cortex.
653+
now := time.Now()
654+
655+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-0")
656+
require.NoError(t, err)
657+
658+
var series []prompb.TimeSeries
659+
var expectedVector model.Vector
660+
// Generate two series which collide on fingerprints and fast fingerprints.
661+
tsMillis := e2e.TimeToMilliseconds(now)
662+
metric1 := []prompb.Label{
663+
{Name: "A", Value: "K6sjsNNczPl"},
664+
{Name: labels.MetricName, Value: "fingerprint_collision"},
665+
}
666+
metric2 := []prompb.Label{
667+
{Name: "A", Value: "cswpLMIZpwt"},
668+
{Name: labels.MetricName, Value: "fingerprint_collision"},
669+
}
670+
671+
series = append(series, prompb.TimeSeries{
672+
Labels: metric1,
673+
Samples: []prompb.Sample{
674+
{Value: float64(0), Timestamp: tsMillis},
675+
},
676+
})
677+
expectedVector = append(expectedVector, &model.Sample{
678+
Metric: prompbLabelsToModelMetric(metric1),
679+
Value: model.SampleValue(float64(0)),
680+
Timestamp: model.Time(tsMillis),
681+
})
682+
series = append(series, prompb.TimeSeries{
683+
Labels: metric2,
684+
Samples: []prompb.Sample{
685+
{Value: float64(1), Timestamp: tsMillis},
686+
},
687+
})
688+
expectedVector = append(expectedVector, &model.Sample{
689+
Metric: prompbLabelsToModelMetric(metric2),
690+
Value: model.SampleValue(float64(1)),
691+
Timestamp: model.Time(tsMillis),
692+
})
693+
694+
res, err := c.Push(series)
695+
require.NoError(t, err)
696+
require.Equal(t, 200, res.StatusCode)
697+
698+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
699+
require.NoError(t, s.StartAndWaitReady(querier))
700+
701+
// Wait until the querier has updated the ring.
702+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
703+
704+
// Query the series.
705+
c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-0")
706+
require.NoError(t, err)
707+
708+
result, err := c.Query("fingerprint_collision", now)
709+
require.NoError(t, err)
710+
require.Equal(t, model.ValVector, result.Type())
711+
require.Equal(t, expectedVector, result.(model.Vector))
712+
}
713+
714+
func prompbLabelsToModelMetric(pbLabels []prompb.Label) model.Metric {
715+
metric := model.Metric{}
716+
717+
for _, l := range pbLabels {
718+
metric[model.LabelName(l.Name)] = model.LabelValue(l.Value)
719+
}
720+
721+
return metric
722+
}

pkg/distributor/query.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,32 +170,32 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
170170
return nil, err
171171
}
172172

173-
hashToChunkseries := map[model.Fingerprint]ingester_client.TimeSeriesChunk{}
174-
hashToTimeSeries := map[model.Fingerprint]ingester_client.TimeSeries{}
173+
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
174+
hashToTimeSeries := map[string]ingester_client.TimeSeries{}
175175

176176
for _, result := range results {
177177
response := result.(*ingester_client.QueryStreamResponse)
178178

179179
// Parse any chunk series
180180
for _, series := range response.Chunkseries {
181-
hash := client.FastFingerprint(series.Labels)
182-
existing := hashToChunkseries[hash]
181+
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
182+
existing := hashToChunkseries[key]
183183
existing.Labels = series.Labels
184184
existing.Chunks = append(existing.Chunks, series.Chunks...)
185-
hashToChunkseries[hash] = existing
185+
hashToChunkseries[key] = existing
186186
}
187187

188188
// Parse any time series
189189
for _, series := range response.Timeseries {
190-
hash := client.FastFingerprint(series.Labels)
191-
existing := hashToTimeSeries[hash]
190+
key := client.LabelsToKeyString(client.FromLabelAdaptersToLabels(series.Labels))
191+
existing := hashToTimeSeries[key]
192192
existing.Labels = series.Labels
193193
if existing.Samples == nil {
194194
existing.Samples = series.Samples
195195
} else {
196196
existing.Samples = mergeSamples(existing.Samples, series.Samples)
197197
}
198-
hashToTimeSeries[hash] = existing
198+
hashToTimeSeries[key] = existing
199199
}
200200
}
201201

pkg/ingester/client/compat.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,16 @@ func Fingerprint(labels labels.Labels) model.Fingerprint {
322322
return model.Fingerprint(sum)
323323
}
324324

325+
// LabelsToKeyString is used to form a string to be used as
326+
// the hashKey. Don't print, use l.String() for printing.
327+
func LabelsToKeyString(l labels.Labels) string {
328+
// We are allocating 1024, even though most series are less than 600b long.
329+
// But this is not an issue as this function is being inlined when called in a loop
330+
// and buffer allocated is a static buffer and not a dynamic buffer on the heap.
331+
b := make([]byte, 0, 1024)
332+
return string(l.Bytes(b))
333+
}
334+
325335
// MarshalJSON implements json.Marshaler.
326336
func (s Sample) MarshalJSON() ([]byte, error) {
327337
t, err := json.Marshal(model.Time(s.TimestampMs))

pkg/ingester/client/compat_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"reflect"
66
"sort"
7+
"strconv"
78
"testing"
89
"unsafe"
910

@@ -216,3 +217,53 @@ func verifyCollision(t *testing.T, collision bool, ls1 labels.Labels, ls2 labels
216217
t.Errorf("expected different fingerprints for %v (%016x) and %v (%016x)", ls1.String(), Fingerprint(ls1), ls2.String(), Fingerprint(ls2))
217218
}
218219
}
220+
221+
// The main usecase for `LabelsToKeyString` is to generate hashKeys
222+
// for maps. We are benchmarking that here.
223+
func BenchmarkSeriesMap(b *testing.B) {
224+
benchmarkSeriesMap(100000, b)
225+
}
226+
227+
func benchmarkSeriesMap(numSeries int, b *testing.B) {
228+
series := makeSeries(numSeries)
229+
sm := make(map[string]int, numSeries)
230+
231+
b.ReportAllocs()
232+
b.ResetTimer()
233+
for n := 0; n < b.N; n++ {
234+
for i, s := range series {
235+
sm[LabelsToKeyString(s)] = i
236+
}
237+
238+
for _, s := range series {
239+
_, ok := sm[LabelsToKeyString(s)]
240+
if !ok {
241+
b.Fatal("element missing")
242+
}
243+
}
244+
245+
if len(sm) != numSeries {
246+
b.Fatal("the number of series expected:", numSeries, "got:", len(sm))
247+
}
248+
}
249+
}
250+
251+
func makeSeries(n int) []labels.Labels {
252+
series := make([]labels.Labels, 0, n)
253+
for i := 0; i < n; i++ {
254+
series = append(series, labels.FromMap(map[string]string{
255+
"label0": "value0",
256+
"label1": "value1",
257+
"label2": "value2",
258+
"label3": "value3",
259+
"label4": "value4",
260+
"label5": "value5",
261+
"label6": "value6",
262+
"label7": "value7",
263+
"label8": "value8",
264+
"label9": strconv.Itoa(i),
265+
}))
266+
}
267+
268+
return series
269+
}

pkg/querier/chunk_store_queryable.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@ func (q *chunkStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...
5353

5454
// Series in the returned set are sorted alphabetically by labels.
5555
func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkIteratorFunc) storage.SeriesSet {
56-
chunksBySeries := map[model.Fingerprint][]chunk.Chunk{}
56+
chunksBySeries := map[string][]chunk.Chunk{}
5757
for _, c := range chunks {
58-
fp := client.Fingerprint(c.Metric)
59-
chunksBySeries[fp] = append(chunksBySeries[fp], c)
58+
key := client.LabelsToKeyString(c.Metric)
59+
chunksBySeries[key] = append(chunksBySeries[key], c)
6060
}
6161

6262
series := make([]storage.Series, 0, len(chunksBySeries))

0 commit comments

Comments
 (0)