Skip to content

Commit 0e9f7b4

Browse files
jtlisitomwilkiepstibrany
authored
Update metrics if transfer fails (#2338)
* update metrics if transfer fails Signed-off-by: Jacob Lisi <[email protected]> * update changelog Signed-off-by: Jacob Lisi <[email protected]> * s/TeardDown/Teardown/ Signed-off-by: Tom Wilkie <[email protected]> * ensure memoryChunks is decremented after failed transfer Signed-off-by: Jacob Lisi <[email protected]> * fix type and use sub instead of dec Signed-off-by: Jacob Lisi <[email protected]> * Extracted fillUserStatesFromStream method Metrics correction is done outside of this method still. Signed-off-by: Peter Štibraný <[email protected]> * Moved metrics correction into defer block Signed-off-by: Peter Štibraný <[email protected]> * Error to be returned is now retErr. Signed-off-by: Peter Štibraný <[email protected]> * Error to be returned is now retErr. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Tom Wilkie <[email protected]> Co-authored-by: Peter Štibraný <[email protected]>
1 parent 9a447fd commit 0e9f7b4

File tree

7 files changed

+177
-64
lines changed

7 files changed

+177
-64
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
* [ENHANCEMENT] Added FIFO cache metrics for current number of entries and memory usage. #2270
9292
* [ENHANCEMENT] Output all config fields to /config API, including those with empty value. #2209
9393
* [ENHANCEMENT] Add "missing_metric_name" and "metric_name_invalid" reasons to cortex_discarded_samples_total metric. #2346
94+
* [BUGFIX] Ensure user state metrics are updated if a transfer fails. #2338
9495
* [BUGFIX] Fixed etcd client keepalive settings. #2278
9596
* [BUGFIX] Fixed bug in updating last element of FIFO cache. #2270
9697
* [BUGFIX] Register the metrics of the WAL. #2295

pkg/ingester/ingester_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"testing"
1515
"time"
1616

17+
"github.com/prometheus/client_golang/prometheus"
1718
"github.com/prometheus/common/model"
1819
"github.com/prometheus/prometheus/pkg/labels"
1920
"github.com/stretchr/testify/assert"
@@ -36,14 +37,14 @@ type testStore struct {
3637
chunks map[string][]chunk.Chunk
3738
}
3839

39-
func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, limits validation.Limits) (*testStore, *Ingester) {
40+
func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, limits validation.Limits, reg prometheus.Registerer) (*testStore, *Ingester) {
4041
store := &testStore{
4142
chunks: map[string][]chunk.Chunk{},
4243
}
4344
overrides, err := validation.NewOverrides(limits, nil)
4445
require.NoError(t, err)
4546

46-
ing, err := New(cfg, clientConfig, overrides, store, nil)
47+
ing, err := New(cfg, clientConfig, overrides, store, reg)
4748
require.NoError(t, err)
4849
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
4950

@@ -54,7 +55,7 @@ func newDefaultTestStore(t require.TestingT) (*testStore, *Ingester) {
5455
return newTestStore(t,
5556
defaultIngesterTestConfig(),
5657
defaultClientTestConfig(),
57-
defaultLimitsTestConfig())
58+
defaultLimitsTestConfig(), nil)
5859
}
5960

6061
func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
@@ -238,7 +239,7 @@ func TestIngesterIdleFlush(t *testing.T) {
238239
cfg.FlushCheckPeriod = 20 * time.Millisecond
239240
cfg.MaxChunkIdle = 100 * time.Millisecond
240241
cfg.RetainPeriod = 500 * time.Millisecond
241-
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
242+
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
242243

243244
userIDs, testData := pushTestSamples(t, ing, 4, 100, 0)
244245

@@ -272,7 +273,7 @@ func TestIngesterSpreadFlush(t *testing.T) {
272273
cfg := defaultIngesterTestConfig()
273274
cfg.SpreadFlushes = true
274275
cfg.FlushCheckPeriod = 20 * time.Millisecond
275-
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
276+
store, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
276277

277278
userIDs, testData := pushTestSamples(t, ing, 4, 100, 0)
278279

@@ -365,7 +366,7 @@ func TestIngesterUserSeriesLimitExceeded(t *testing.T) {
365366
limits := defaultLimitsTestConfig()
366367
limits.MaxLocalSeriesPerUser = 1
367368

368-
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
369+
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits, nil)
369370
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
370371

371372
userID := "1"
@@ -422,7 +423,7 @@ func TestIngesterMetricSeriesLimitExceeded(t *testing.T) {
422423
limits := defaultLimitsTestConfig()
423424
limits.MaxLocalSeriesPerMetric = 1
424425

425-
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits)
426+
_, ing := newTestStore(t, defaultIngesterTestConfig(), defaultClientTestConfig(), limits, nil)
426427
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
427428

428429
userID := "1"
@@ -573,7 +574,7 @@ func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpecte
573574
b.Run(fmt.Sprintf("encoding=%s", enc.name), func(b *testing.B) {
574575
b.ResetTimer()
575576
for iter := 0; iter < b.N; iter++ {
576-
_, ing := newTestStore(b, cfg, clientCfg, limits)
577+
_, ing := newTestStore(b, cfg, clientCfg, limits, nil)
577578
// Bump the timestamp on each of our test samples each time round the loop
578579
for j := 0; j < samples; j++ {
579580
for i := range allSamples {

pkg/ingester/lifecycle_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func TestIngesterRestart(t *testing.T) {
7171
config.LifecyclerConfig.SkipUnregister = true
7272

7373
{
74-
_, ingester := newTestStore(t, config, clientConfig, limits)
74+
_, ingester := newTestStore(t, config, clientConfig, limits, nil)
7575
time.Sleep(100 * time.Millisecond)
7676
// doesn't actually unregister due to skipUnregister: true
7777
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester))
@@ -82,7 +82,7 @@ func TestIngesterRestart(t *testing.T) {
8282
})
8383

8484
{
85-
_, ingester := newTestStore(t, config, clientConfig, limits)
85+
_, ingester := newTestStore(t, config, clientConfig, limits, nil)
8686
time.Sleep(100 * time.Millisecond)
8787
// doesn't actually unregister due to skipUnregister: true
8888
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ingester))

pkg/ingester/transfer.go

Lines changed: 86 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -29,69 +29,105 @@ var (
2929
errTransferNoPendingIngesters = errors.New("no pending ingesters")
3030
)
3131

32-
// TransferChunks receives all the chunks from another ingester.
33-
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error {
34-
fromIngesterID := ""
35-
seriesReceived := 0
36-
xfer := func() error {
37-
userStates := newUserStates(i.limiter, i.cfg, i.metrics)
32+
// returns source ingesterID, number of received series, added chunks and error
33+
func (i *Ingester) fillUserStatesFromStream(userStates *userStates, stream client.Ingester_TransferChunksServer) (fromIngesterID string, seriesReceived int, retErr error) {
34+
chunksAdded := 0.0
3835

39-
for {
40-
wireSeries, err := stream.Recv()
41-
if err == io.EOF {
42-
break
43-
}
44-
if err != nil {
45-
return errors.Wrap(err, "TransferChunks: Recv")
46-
}
36+
defer func() {
37+
if retErr != nil {
38+
// Ensure the in memory chunks are updated to reflect the number of dropped chunks from the transfer
39+
i.metrics.memoryChunks.Sub(chunksAdded)
4740

48-
// We can't send "extra" fields with a streaming call, so we repeat
49-
// wireSeries.FromIngesterId and assume it is the same every time
50-
// round this loop.
51-
if fromIngesterID == "" {
52-
fromIngesterID = wireSeries.FromIngesterId
53-
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
41+
// If an error occurs during the transfer and the user state is to be discarded,
42+
// ensure the metrics it exports reflect this.
43+
userStates.teardown()
44+
}
45+
}()
5446

55-
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
56-
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
57-
if err != nil {
58-
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
59-
}
60-
}
61-
descs, err := fromWireChunks(wireSeries.Chunks)
62-
if err != nil {
63-
return errors.Wrap(err, "TransferChunks: fromWireChunks")
64-
}
47+
for {
48+
wireSeries, err := stream.Recv()
49+
if err == io.EOF {
50+
break
51+
}
52+
if err != nil {
53+
retErr = errors.Wrap(err, "TransferChunks: Recv")
54+
return
55+
}
6556

66-
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels, nil)
67-
if err != nil {
68-
return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels)
69-
}
70-
prevNumChunks := len(series.chunkDescs)
57+
// We can't send "extra" fields with a streaming call, so we repeat
58+
// wireSeries.FromIngesterId and assume it is the same every time
59+
// round this loop.
60+
if fromIngesterID == "" {
61+
fromIngesterID = wireSeries.FromIngesterId
62+
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
7163

72-
err = series.setChunks(descs)
73-
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
64+
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
65+
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
7466
if err != nil {
75-
return errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels)
67+
retErr = errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
68+
return
7669
}
77-
78-
seriesReceived++
79-
i.metrics.memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks))
80-
i.metrics.receivedChunks.Add(float64(len(descs)))
70+
}
71+
descs, err := fromWireChunks(wireSeries.Chunks)
72+
if err != nil {
73+
retErr = errors.Wrap(err, "TransferChunks: fromWireChunks")
74+
return
8175
}
8276

83-
if seriesReceived == 0 {
84-
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
85-
return fmt.Errorf("TransferChunks: no series")
77+
state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels, nil)
78+
if err != nil {
79+
retErr = errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels)
80+
return
8681
}
82+
prevNumChunks := len(series.chunkDescs)
8783

88-
if fromIngesterID == "" {
89-
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
90-
return fmt.Errorf("no ingester id")
84+
err = series.setChunks(descs)
85+
state.fpLocker.Unlock(fp) // acquired in getOrCreateSeries
86+
if err != nil {
87+
retErr = errors.Wrapf(err, "TransferChunks: setChunks: user %s series %s", wireSeries.UserId, wireSeries.Labels)
88+
return
9189
}
9290

93-
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
94-
return errors.Wrap(err, "TransferChunks: ClaimTokensFor")
91+
seriesReceived++
92+
chunksDelta := float64(len(series.chunkDescs) - prevNumChunks)
93+
chunksAdded += chunksDelta
94+
i.metrics.memoryChunks.Add(chunksDelta)
95+
i.metrics.receivedChunks.Add(float64(len(descs)))
96+
}
97+
98+
if seriesReceived == 0 {
99+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
100+
retErr = fmt.Errorf("TransferChunks: no series")
101+
return
102+
}
103+
104+
if fromIngesterID == "" {
105+
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
106+
retErr = fmt.Errorf("no ingester id")
107+
return
108+
}
109+
110+
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
111+
retErr = errors.Wrap(err, "TransferChunks: ClaimTokensFor")
112+
return
113+
}
114+
115+
return
116+
}
117+
118+
// TransferChunks receives all the chunks from another ingester.
119+
func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) error {
120+
fromIngesterID := ""
121+
seriesReceived := 0
122+
123+
xfer := func() error {
124+
userStates := newUserStates(i.limiter, i.cfg, i.metrics)
125+
126+
var err error
127+
fromIngesterID, seriesReceived, err = i.fillUserStatesFromStream(userStates, stream)
128+
129+
if err != nil {
130+
return err
95131
}
96132

97133
i.userStatesMtx.Lock()

pkg/ingester/user_state.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ func (us *userStates) getOrCreate(userID string) *userState {
149149
return state
150150
}
151151

152+
// teardown ensures metrics are accurately updated if a userStates struct is discarded
153+
func (us *userStates) teardown() {
154+
for _, u := range us.cp() {
155+
u.memSeriesRemovedTotal.Add(float64(u.fpToSeries.length()))
156+
u.memSeries.Sub(float64(u.fpToSeries.length()))
157+
us.metrics.memUsers.Dec()
158+
}
159+
}
160+
152161
func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) {
153162
userID, err := user.ExtractOrgID(ctx)
154163
if err != nil {

pkg/ingester/user_state_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ import (
44
"context"
55
"fmt"
66
"math"
7+
"strings"
78
"testing"
89

10+
"github.com/prometheus/client_golang/prometheus"
11+
"github.com/prometheus/client_golang/prometheus/testutil"
912
"github.com/prometheus/common/model"
1013
"github.com/prometheus/prometheus/pkg/labels"
14+
"github.com/stretchr/testify/assert"
1115
"github.com/stretchr/testify/require"
1216
"github.com/weaveworks/common/user"
1317
)
@@ -71,3 +75,65 @@ func TestForSeriesMatchingBatching(t *testing.T) {
7175
})
7276
}
7377
}
78+
79+
// TestTeardown ensures metrics are updated correctly if the userState is discarded
80+
func TestTeardown(t *testing.T) {
81+
reg := prometheus.NewPedanticRegistry()
82+
_, ing := newTestStore(t,
83+
defaultIngesterTestConfig(),
84+
defaultClientTestConfig(),
85+
defaultLimitsTestConfig(),
86+
reg)
87+
88+
pushTestSamples(t, ing, 100, 100, 0)
89+
90+
// Assert exported metrics (3 blocks, 5 files per block, 2 files WAL).
91+
metricNames := []string{
92+
"cortex_ingester_memory_series_created_total",
93+
"cortex_ingester_memory_series_removed_total",
94+
"cortex_ingester_memory_series",
95+
"cortex_ingester_memory_users",
96+
}
97+
98+
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
99+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
100+
# TYPE cortex_ingester_memory_series_removed_total counter
101+
cortex_ingester_memory_series_removed_total{user="1"} 0
102+
cortex_ingester_memory_series_removed_total{user="2"} 0
103+
cortex_ingester_memory_series_removed_total{user="3"} 0
104+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
105+
# TYPE cortex_ingester_memory_series_created_total counter
106+
cortex_ingester_memory_series_created_total{user="1"} 100
107+
cortex_ingester_memory_series_created_total{user="2"} 100
108+
cortex_ingester_memory_series_created_total{user="3"} 100
109+
# HELP cortex_ingester_memory_series The current number of series in memory.
110+
# TYPE cortex_ingester_memory_series gauge
111+
cortex_ingester_memory_series 300
112+
# HELP cortex_ingester_memory_users The current number of users in memory.
113+
# TYPE cortex_ingester_memory_users gauge
114+
cortex_ingester_memory_users 3
115+
`), metricNames...))
116+
117+
ing.userStatesMtx.Lock()
118+
defer ing.userStatesMtx.Unlock()
119+
ing.userStates.teardown()
120+
121+
assert.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
122+
# HELP cortex_ingester_memory_series_removed_total The total number of series that were removed per user.
123+
# TYPE cortex_ingester_memory_series_removed_total counter
124+
cortex_ingester_memory_series_removed_total{user="1"} 100
125+
cortex_ingester_memory_series_removed_total{user="2"} 100
126+
cortex_ingester_memory_series_removed_total{user="3"} 100
127+
# HELP cortex_ingester_memory_series_created_total The total number of series that were created per user.
128+
# TYPE cortex_ingester_memory_series_created_total counter
129+
cortex_ingester_memory_series_created_total{user="1"} 100
130+
cortex_ingester_memory_series_created_total{user="2"} 100
131+
cortex_ingester_memory_series_created_total{user="3"} 100
132+
# HELP cortex_ingester_memory_series The current number of series in memory.
133+
# TYPE cortex_ingester_memory_series gauge
134+
cortex_ingester_memory_series 0
135+
# HELP cortex_ingester_memory_users The current number of users in memory.
136+
# TYPE cortex_ingester_memory_users gauge
137+
cortex_ingester_memory_users 0
138+
`), metricNames...))
139+
}

pkg/ingester/wal_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func TestWAL(t *testing.T) {
3434
numRestarts := 3
3535

3636
// Build an ingester, add some samples, then shut it down.
37-
_, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
37+
_, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
3838
userIDs, testData := pushTestSamples(t, ing, numSeries, numSamplesPerSeriesPerPush, 0)
3939
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing))
4040

@@ -44,7 +44,7 @@ func TestWAL(t *testing.T) {
4444
cfg.WALConfig.CheckpointEnabled = false
4545
}
4646
// Start a new ingester and recover the WAL.
47-
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
47+
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
4848

4949
for i, userID := range userIDs {
5050
testData[userID] = buildTestMatrix(numSeries, (r+1)*numSamplesPerSeriesPerPush, i)
@@ -78,7 +78,7 @@ func TestCheckpointRepair(t *testing.T) {
7878
cfg.WALConfig.Dir = dirname
7979

8080
// Build an ingester, add some samples, then shut it down.
81-
_, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
81+
_, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
8282

8383
w, ok := ing.wal.(*walWrapper)
8484
require.True(t, ok)
@@ -132,7 +132,7 @@ func TestCheckpointRepair(t *testing.T) {
132132
}
133133

134134
// Open an ingester for the repair.
135-
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
135+
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig(), nil)
136136
w, ok = ing.wal.(*walWrapper)
137137
require.True(t, ok)
138138
// defer in case we hit an error though we explicitly close it later.

0 commit comments

Comments
 (0)