Skip to content

Commit 876a8f5

Browse files
authored
Remove old temporary checkpoints (#2726)
If an ingester crashes while doing a checkpoint, which is likely as checkpoints are almost always happening, then a checkpoint like checkpoint.123456.tmp will be left on disk. When the next checkpoint succeeds, the old .tmp checkpoints are not deleted even though it will never be used. After enough crashes it is possible to fill the entire disk with .tmp checkpoints, bringing down an ingester. Signed-off-by: Chris Marchbanks <[email protected]>
1 parent 54c4a64 commit 876a8f5

File tree

3 files changed

+82
-10
lines changed

3 files changed

+82
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
* [BUGFIX] Cassandra: fixed an edge case leading to an invalid CQL query when querying the index on a Cassandra store. #2639
140140
* [BUGFIX] Ingester: increment series per metric when recovering from WAL or transfer. #2674
141141
* [BUGFIX] Fixed `wrong number of arguments for 'mget' command` Redis error when a query has no chunks to lookup from storage. #2700
142+
* [BUGFIX] Ingester: Automatically remove old tmp checkpoints, fixing a potential disk space leak after an ingester crashes.
142143

143144
## 1.1.0 / 2020-05-21
144145

pkg/ingester/wal.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88
"math"
99
"os"
1010
"path/filepath"
11+
"regexp"
1112
"runtime"
1213
"strconv"
13-
"strings"
1414
"sync"
1515
"time"
1616

@@ -413,16 +413,13 @@ func lastCheckpoint(dir string) (string, int, error) {
413413
for i := 0; i < len(dirs); i++ {
414414
di := dirs[i]
415415

416-
if !strings.HasPrefix(di.Name(), checkpointPrefix) {
416+
idx, err := checkpointIndex(di.Name(), false)
417+
if err != nil {
417418
continue
418419
}
419420
if !di.IsDir() {
420421
return "", -1, fmt.Errorf("checkpoint %s is not a directory", di.Name())
421422
}
422-
idx, err := strconv.Atoi(di.Name()[len(checkpointPrefix):])
423-
if err != nil {
424-
continue
425-
}
426423
if idx > maxIdx {
427424
checkpointDir = di.Name()
428425
maxIdx = idx
@@ -450,10 +447,7 @@ func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) {
450447
return err
451448
}
452449
for _, fi := range files {
453-
if !strings.HasPrefix(fi.Name(), checkpointPrefix) {
454-
continue
455-
}
456-
index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):])
450+
index, err := checkpointIndex(fi.Name(), true)
457451
if err != nil || index >= maxIndex {
458452
continue
459453
}
@@ -464,6 +458,23 @@ func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) {
464458
return errs.Err()
465459
}
466460

461+
var checkpointRe = regexp.MustCompile("^" + regexp.QuoteMeta(checkpointPrefix) + "(\\d+)(\\.tmp)?$")
462+
463+
// checkpointIndex returns the index of a given checkpoint file. It handles
464+
// both regular and temporary checkpoints according to the includeTmp flag. If
465+
// the file is not a checkpoint it returns an error.
466+
func checkpointIndex(filename string, includeTmp bool) (int, error) {
467+
result := checkpointRe.FindStringSubmatch(filename)
468+
if len(result) < 2 {
469+
return 0, errors.New("file is not a checkpoint")
470+
}
471+
// Filter out temporary checkpoints if desired.
472+
if !includeTmp && len(result) == 3 && result[2] != "" {
473+
return 0, errors.New("temporary checkpoint")
474+
}
475+
return strconv.Atoi(result[1])
476+
}
477+
467478
// checkpointSeries write the chunks of the series to the checkpoint.
468479
func (w *walWrapper) checkpointSeries(userID string, fp model.Fingerprint, series *memorySeries, wireChunks []client.Chunk, b []byte) ([]client.Chunk, []byte, error) {
469480
var err error

pkg/ingester/wal_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,66 @@ func TestMigrationToTypedRecord(t *testing.T) {
285285
require.Equal(t, checkpointRecord, newCheckpointRecordDecoded)
286286
}
287287

288+
func TestCheckpointIndex(t *testing.T) {
289+
tcs := []struct {
290+
filename string
291+
includeTmp bool
292+
index int
293+
shouldError bool
294+
}{
295+
{
296+
filename: "checkpoint.123456",
297+
includeTmp: false,
298+
index: 123456,
299+
shouldError: false,
300+
},
301+
{
302+
filename: "checkpoint.123456",
303+
includeTmp: true,
304+
index: 123456,
305+
shouldError: false,
306+
},
307+
{
308+
filename: "checkpoint.123456.tmp",
309+
includeTmp: true,
310+
index: 123456,
311+
shouldError: false,
312+
},
313+
{
314+
filename: "checkpoint.123456.tmp",
315+
includeTmp: false,
316+
shouldError: true,
317+
},
318+
{
319+
filename: "not-checkpoint.123456.tmp",
320+
includeTmp: true,
321+
shouldError: true,
322+
},
323+
{
324+
filename: "checkpoint.123456.tmp2",
325+
shouldError: true,
326+
},
327+
{
328+
filename: "checkpoints123456",
329+
shouldError: true,
330+
},
331+
{
332+
filename: "012345",
333+
shouldError: true,
334+
},
335+
}
336+
for _, tc := range tcs {
337+
index, err := checkpointIndex(tc.filename, tc.includeTmp)
338+
if tc.shouldError {
339+
require.Error(t, err, "filename: %s, includeTmp: %t", tc.filename, tc.includeTmp)
340+
continue
341+
}
342+
343+
require.NoError(t, err, "filename: %s, includeTmp: %t", tc.filename, tc.includeTmp)
344+
require.Equal(t, tc.index, index)
345+
}
346+
}
347+
288348
func BenchmarkWALReplay(b *testing.B) {
289349
dirname, err := ioutil.TempDir("", "cortex-wal")
290350
require.NoError(b, err)

0 commit comments

Comments
 (0)