Skip to content

Commit ffecf3b

Browse files
authored
Optimize cleaner run time (#6815)
* optimize cleaner run time Signed-off-by: Alex Le <[email protected]> * make old bucket.DeletePrefix concurrent Signed-off-by: Alex Le <[email protected]> * added metric to record failure pushing to job channel. updated CHANGELOG. put more logs with timing values Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent be90c4a commit ffecf3b

File tree

4 files changed

+92
-23
lines changed

4 files changed

+92
-23
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4141
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
4242
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
43+
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
4344
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4445
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4546
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

pkg/compactor/blocks_cleaner.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type BlocksCleaner struct {
8282
remainingPlannedCompactions *prometheus.GaugeVec
8383
inProgressCompactions *prometheus.GaugeVec
8484
oldestPartitionGroupOffset *prometheus.GaugeVec
85+
enqueueJobFailed *prometheus.CounterVec
8586
}
8687

8788
func NewBlocksCleaner(
@@ -186,6 +187,10 @@ func NewBlocksCleaner(
186187
remainingPlannedCompactions: remainingPlannedCompactions,
187188
inProgressCompactions: inProgressCompactions,
188189
oldestPartitionGroupOffset: oldestPartitionGroupOffset,
190+
enqueueJobFailed: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
191+
Name: "cortex_compactor_enqueue_cleaner_job_failed_total",
192+
Help: "Total number of cleaner jobs failed to be enqueued.",
193+
}, []string{"user_status"}),
189194
}
190195

191196
c.Service = services.NewBasicService(c.starting, c.loop, nil)
@@ -243,13 +248,25 @@ func (c *BlocksCleaner) loop(ctx context.Context) error {
243248
continue
244249
}
245250
cleanJobTimestamp := time.Now().Unix()
246-
usersChan <- &cleanerJob{
251+
252+
select {
253+
case usersChan <- &cleanerJob{
247254
users: activeUsers,
248255
timestamp: cleanJobTimestamp,
256+
}:
257+
default:
258+
level.Warn(c.logger).Log("msg", "unable to push cleaning job to usersChan")
259+
c.enqueueJobFailed.WithLabelValues(activeStatus).Inc()
249260
}
250-
deleteChan <- &cleanerJob{
261+
262+
select {
263+
case deleteChan <- &cleanerJob{
251264
users: deletedUsers,
252265
timestamp: cleanJobTimestamp,
266+
}:
267+
default:
268+
level.Warn(c.logger).Log("msg", "unable to push deletion job to deleteChan")
269+
c.enqueueJobFailed.WithLabelValues(deletedStatus).Inc()
253270
}
254271

255272
case <-ctx.Done():
@@ -392,10 +409,18 @@ func (c *BlocksCleaner) obtainVisitMarkerManager(ctx context.Context, userLogger
392409
}
393410

394411
// Remove blocks and remaining data for tenant marked for deletion.
395-
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) error {
396-
412+
func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket, userID string) (returnErr error) {
413+
startTime := time.Now()
397414
level.Info(userLogger).Log("msg", "deleting blocks for tenant marked for deletion")
415+
defer func() {
416+
if returnErr != nil {
417+
level.Warn(userLogger).Log("msg", "failed deleting tenant marked for deletion", "err", returnErr)
418+
} else {
419+
level.Info(userLogger).Log("msg", "completed deleting tenant marked for deletion", "duration", time.Since(startTime), "duration_ms", time.Since(startTime).Milliseconds())
420+
}
421+
}()
398422

423+
begin := time.Now()
399424
// We immediately delete the bucket index, to signal to its consumers that
400425
// the tenant has "no blocks" in the storage.
401426
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
@@ -465,6 +490,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
465490
if deletedBlocks.Load() > 0 {
466491
level.Info(userLogger).Log("msg", "deleted blocks for tenant marked for deletion", "deletedBlocks", deletedBlocks.Load())
467492
}
493+
level.Info(userLogger).Log("msg", "completed deleting blocks for tenant marked for deletion", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
468494

469495
mark, err := cortex_tsdb.ReadTenantDeletionMark(ctx, c.bucketClient, userID)
470496
if err != nil {
@@ -491,10 +517,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
491517
return err
492518
}
493519

494-
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
520+
begin = time.Now()
521+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger, defaultDeleteBlocksConcurrency); err != nil {
495522
return errors.Wrap(err, "failed to delete marker files")
496523
} else if deleted > 0 {
497-
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
524+
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
498525
}
499526
if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil {
500527
return errors.Wrap(err, "failed to delete tenant deletion mark")
@@ -503,18 +530,20 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
503530
}
504531

505532
func (c *BlocksCleaner) deleteNonDataFiles(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) error {
506-
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger); err != nil {
533+
begin := time.Now()
534+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger, defaultDeleteBlocksConcurrency); err != nil {
507535
return errors.Wrap(err, "failed to delete "+block.DebugMetas)
508536
} else if deleted > 0 {
509-
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted)
537+
level.Info(userLogger).Log("msg", "deleted files under "+block.DebugMetas+" for tenant marked for deletion", "count", deleted, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
510538
}
511539

512540
if c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
541+
begin = time.Now()
513542
// Clean up partitioned group info files
514-
if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger); err != nil {
543+
if deleted, err := bucket.DeletePrefix(ctx, userBucket, PartitionedGroupDirectory, userLogger, defaultDeleteBlocksConcurrency); err != nil {
515544
return errors.Wrap(err, "failed to delete "+PartitionedGroupDirectory)
516545
} else if deleted > 0 {
517-
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
546+
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted, "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
518547
}
519548
}
520549
return nil
@@ -531,7 +560,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
531560
if returnErr != nil {
532561
level.Warn(userLogger).Log("msg", "failed blocks cleanup and maintenance", "err", returnErr)
533562
} else {
534-
level.Info(userLogger).Log("msg", "completed blocks cleanup and maintenance", "duration", time.Since(startTime))
563+
level.Info(userLogger).Log("msg", "completed blocks cleanup and maintenance", "duration", time.Since(startTime), "duration_ms", time.Since(startTime).Milliseconds())
535564
}
536565
c.tenantCleanDuration.WithLabelValues(userID).Set(time.Since(startTime).Seconds())
537566
}()
@@ -771,7 +800,7 @@ func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucke
771800

772801
if extraInfo.status.CanDelete || extraInfo.status.DeleteVisitMarker {
773802
// Remove partition visit markers
774-
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger); err != nil {
803+
if _, err := bucket.DeletePrefix(ctx, userBucket, GetPartitionVisitMarkerDirectoryPath(partitionedGroupInfo.PartitionedGroupID), userLogger, defaultDeleteBlocksConcurrency); err != nil {
775804
level.Warn(userLogger).Log("msg", "failed to delete partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile, "err", err)
776805
} else {
777806
level.Info(userLogger).Log("msg", "deleted partition visit markers for partitioned group", "partitioned_group_info", partitionedGroupInfoFile)

pkg/storage/bucket/bucket_util.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,45 @@ import (
77
"github.com/go-kit/log"
88
"github.com/go-kit/log/level"
99
"github.com/thanos-io/objstore"
10+
"go.uber.org/atomic"
11+
12+
"github.com/cortexproject/cortex/pkg/util/concurrency"
1013
)
1114

1215
// DeletePrefix removes all objects with given prefix, recursively.
1316
// It returns number of deleted objects.
1417
// If deletion of any object fails, it returns error and stops.
15-
func DeletePrefix(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger) (int, error) {
16-
result := 0
17-
err := bkt.Iter(ctx, prefix, func(name string) error {
18-
if strings.HasSuffix(name, objstore.DirDelim) {
19-
deleted, err := DeletePrefix(ctx, bkt, name, logger)
20-
result += deleted
21-
return err
22-
}
18+
func DeletePrefix(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger, maxConcurrency int) (int, error) {
19+
keys, err := ListPrefixes(ctx, bkt, prefix, logger)
20+
if err != nil {
21+
return 0, err
22+
}
2323

24+
result := atomic.NewInt32(0)
25+
err = concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(keys), maxConcurrency, func(ctx context.Context, key interface{}) error {
26+
name := key.(string)
2427
if err := bkt.Delete(ctx, name); err != nil {
2528
return err
2629
}
27-
result++
30+
result.Inc()
2831
level.Debug(logger).Log("msg", "deleted file", "file", name)
2932
return nil
3033
})
3134

32-
return result, err
35+
return int(result.Load()), err
36+
}
37+
38+
func ListPrefixes(ctx context.Context, bkt objstore.Bucket, prefix string, logger log.Logger) ([]string, error) {
39+
var keys []string
40+
err := bkt.Iter(ctx, prefix, func(name string) error {
41+
if strings.HasSuffix(name, objstore.DirDelim) {
42+
moreKeys, err := ListPrefixes(ctx, bkt, name, logger)
43+
keys = append(keys, moreKeys...)
44+
return err
45+
}
46+
47+
keys = append(keys, name)
48+
return nil
49+
})
50+
return keys, err
3351
}

pkg/storage/bucket/bucket_util_test.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bucket
22

33
import (
44
"context"
5+
"fmt"
56
"strings"
67
"testing"
78

@@ -21,8 +22,28 @@ func TestDeletePrefix(t *testing.T) {
2122
require.NoError(t, mem.Upload(context.Background(), "prefix/sub2/4", strings.NewReader("hello")))
2223
require.NoError(t, mem.Upload(context.Background(), "outside/obj", strings.NewReader("hello")))
2324

24-
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger())
25+
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger(), 1)
2526
require.NoError(t, err)
2627
assert.Equal(t, 4, del)
2728
assert.Equal(t, 2, len(mem.Objects()))
2829
}
30+
31+
func TestDeletePrefixConcurrent(t *testing.T) {
32+
mem := objstore.NewInMemBucket()
33+
34+
require.NoError(t, mem.Upload(context.Background(), "obj", strings.NewReader("hello")))
35+
require.NoError(t, mem.Upload(context.Background(), "prefix/1", strings.NewReader("hello")))
36+
require.NoError(t, mem.Upload(context.Background(), "prefix/2", strings.NewReader("hello")))
37+
require.NoError(t, mem.Upload(context.Background(), "prefix/sub1/3", strings.NewReader("hello")))
38+
require.NoError(t, mem.Upload(context.Background(), "prefix/sub2/4", strings.NewReader("hello")))
39+
require.NoError(t, mem.Upload(context.Background(), "outside/obj", strings.NewReader("hello")))
40+
n := 10000
41+
for i := 0; i < n; i++ {
42+
require.NoError(t, mem.Upload(context.Background(), fmt.Sprintf("prefix/sub/%d", i), strings.NewReader(fmt.Sprintf("hello%d", i))))
43+
}
44+
45+
del, err := DeletePrefix(context.Background(), mem, "prefix", log.NewNopLogger(), 100)
46+
require.NoError(t, err)
47+
assert.Equal(t, 4+n, del)
48+
assert.Equal(t, 2, len(mem.Objects()))
49+
}

0 commit comments

Comments
 (0)