Skip to content

Commit c1a2134

Browse files
authored
Hook up partition compaction end to end implementation (#6510)
* Implemented partition compaction end to end with custom compaction lifecycle Signed-off-by: Alex Le <[email protected]> * removed unused variable Signed-off-by: Alex Le <[email protected]> * tweak test Signed-off-by: Alex Le <[email protected]> * tweak test Signed-off-by: Alex Le <[email protected]> * refactor according to comments Signed-off-by: Alex Le <[email protected]> * tweak test Signed-off-by: Alex Le <[email protected]> * check context error inside sharded posting Signed-off-by: Alex Le <[email protected]> * fix lint Signed-off-by: Alex Le <[email protected]> * fix integration test for memberlist Signed-off-by: Alex Le <[email protected]> * make compactor initial wait cancellable Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 08245d4 commit c1a2134

20 files changed

+3537
-46
lines changed
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
6+
"github.com/prometheus/prometheus/storage"
7+
"github.com/prometheus/prometheus/util/annotations"
8+
)
9+
10+
type backgrounChunkSeriesSet struct {
11+
nextSet chan storage.ChunkSeries
12+
actual storage.ChunkSeries
13+
cs storage.ChunkSeriesSet
14+
}
15+
16+
func (b *backgrounChunkSeriesSet) Next() bool {
17+
s, ok := <-b.nextSet
18+
b.actual = s
19+
return ok
20+
}
21+
22+
func (b *backgrounChunkSeriesSet) At() storage.ChunkSeries {
23+
return b.actual
24+
}
25+
26+
func (b *backgrounChunkSeriesSet) Err() error {
27+
return b.cs.Err()
28+
}
29+
30+
func (b *backgrounChunkSeriesSet) Warnings() annotations.Annotations {
31+
return b.cs.Warnings()
32+
}
33+
34+
func (b *backgrounChunkSeriesSet) run(ctx context.Context) {
35+
for {
36+
if !b.cs.Next() {
37+
close(b.nextSet)
38+
return
39+
}
40+
41+
select {
42+
case b.nextSet <- b.cs.At():
43+
case <-ctx.Done():
44+
return
45+
}
46+
}
47+
}
48+
49+
func NewBackgroundChunkSeriesSet(ctx context.Context, cs storage.ChunkSeriesSet) storage.ChunkSeriesSet {
50+
r := &backgrounChunkSeriesSet{
51+
cs: cs,
52+
nextSet: make(chan storage.ChunkSeries, 1000),
53+
}
54+
55+
go func() {
56+
r.run(ctx)
57+
}()
58+
59+
return r
60+
}

pkg/compactor/blocks_cleaner.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -152,23 +152,23 @@ func NewBlocksCleaner(
152152
tenantBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
153153
Name: "cortex_bucket_blocks_count",
154154
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
155-
}, []string{"user"}),
155+
}, commonLabels),
156156
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
157157
Name: "cortex_bucket_blocks_marked_for_deletion_count",
158158
Help: "Total number of blocks marked for deletion in the bucket.",
159-
}, []string{"user"}),
159+
}, commonLabels),
160160
tenantBlocksMarkedForNoCompaction: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
161161
Name: "cortex_bucket_blocks_marked_for_no_compaction_count",
162162
Help: "Total number of blocks marked for no compaction in the bucket.",
163-
}, []string{"user"}),
163+
}, commonLabels),
164164
tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
165165
Name: "cortex_bucket_blocks_partials_count",
166166
Help: "Total number of partial blocks.",
167-
}, []string{"user"}),
167+
}, commonLabels),
168168
tenantBucketIndexLastUpdate: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
169169
Name: "cortex_bucket_index_last_successful_update_timestamp_seconds",
170170
Help: "Timestamp of the last successful update of a tenant's bucket index.",
171-
}, []string{"user"}),
171+
}, commonLabels),
172172
tenantBlocksCleanedTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
173173
Name: "cortex_bucket_blocks_cleaned_total",
174174
Help: "Total number of blocks deleted for a tenant.",

pkg/compactor/compactor.go

Lines changed: 131 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,30 @@ var (
160160
}
161161
return compactor, plannerFactory, nil
162162
}
163+
164+
DefaultBlockDeletableCheckerFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger) compact.BlockDeletableChecker {
165+
return compact.DefaultBlockDeletableChecker{}
166+
}
167+
168+
PartitionCompactionBlockDeletableCheckerFactory = func(ctx context.Context, bkt objstore.InstrumentedBucket, logger log.Logger) compact.BlockDeletableChecker {
169+
return NewPartitionCompactionBlockDeletableChecker()
170+
}
171+
172+
DefaultCompactionLifecycleCallbackFactory = func(_ context.Context, _ objstore.InstrumentedBucket, _ log.Logger, _ int, _ string, _ string, _ *compactorMetrics) compact.CompactionLifecycleCallback {
173+
return compact.DefaultCompactionLifecycleCallback{}
174+
}
175+
176+
ShardedCompactionLifecycleCallbackFactory = func(ctx context.Context, userBucket objstore.InstrumentedBucket, logger log.Logger, metaSyncConcurrency int, compactDir string, userID string, compactorMetrics *compactorMetrics) compact.CompactionLifecycleCallback {
177+
return NewShardedCompactionLifecycleCallback(
178+
ctx,
179+
userBucket,
180+
logger,
181+
metaSyncConcurrency,
182+
compactDir,
183+
userID,
184+
compactorMetrics,
185+
)
186+
}
163187
)
164188

165189
// BlocksGrouperFactory builds and returns the grouper to use to compact a tenant's blocks.
@@ -202,6 +226,22 @@ type PlannerFactory func(
202226
compactorMetrics *compactorMetrics,
203227
) compact.Planner
204228

229+
type CompactionLifecycleCallbackFactory func(
230+
ctx context.Context,
231+
userBucket objstore.InstrumentedBucket,
232+
logger log.Logger,
233+
metaSyncConcurrency int,
234+
compactDir string,
235+
userID string,
236+
compactorMetrics *compactorMetrics,
237+
) compact.CompactionLifecycleCallback
238+
239+
type BlockDeletableCheckerFactory func(
240+
ctx context.Context,
241+
bkt objstore.InstrumentedBucket,
242+
logger log.Logger,
243+
) compact.BlockDeletableChecker
244+
205245
// Limits defines limits used by the Compactor.
206246
type Limits interface {
207247
CompactorTenantShardSize(userID string) int
@@ -380,6 +420,10 @@ type Compactor struct {
380420

381421
blocksPlannerFactory PlannerFactory
382422

423+
blockDeletableCheckerFactory BlockDeletableCheckerFactory
424+
425+
compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory
426+
383427
// Client used to run operations on the bucket storing blocks.
384428
bucketClient objstore.InstrumentedBucket
385429

@@ -436,11 +480,25 @@ func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfi
436480
}
437481
}
438482

483+
var blockDeletableCheckerFactory BlockDeletableCheckerFactory
484+
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle && compactorCfg.CompactionStrategy == util.CompactionStrategyPartitioning {
485+
blockDeletableCheckerFactory = PartitionCompactionBlockDeletableCheckerFactory
486+
} else {
487+
blockDeletableCheckerFactory = DefaultBlockDeletableCheckerFactory
488+
}
489+
490+
var compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory
491+
if compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle && compactorCfg.CompactionStrategy == util.CompactionStrategyPartitioning {
492+
compactionLifecycleCallbackFactory = ShardedCompactionLifecycleCallbackFactory
493+
} else {
494+
compactionLifecycleCallbackFactory = DefaultCompactionLifecycleCallbackFactory
495+
}
496+
439497
if ingestionReplicationFactor <= 0 {
440498
ingestionReplicationFactor = 1
441499
}
442500

443-
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, limits, ingestionReplicationFactor)
501+
cortexCompactor, err := newCompactor(compactorCfg, storageCfg, logger, registerer, bucketClientFactory, blocksGrouperFactory, blocksCompactorFactory, blockDeletableCheckerFactory, compactionLifecycleCallbackFactory, limits, ingestionReplicationFactor)
444502
if err != nil {
445503
return nil, errors.Wrap(err, "failed to create Cortex blocks compactor")
446504
}
@@ -456,6 +514,8 @@ func newCompactor(
456514
bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error),
457515
blocksGrouperFactory BlocksGrouperFactory,
458516
blocksCompactorFactory BlocksCompactorFactory,
517+
blockDeletableCheckerFactory BlockDeletableCheckerFactory,
518+
compactionLifecycleCallbackFactory CompactionLifecycleCallbackFactory,
459519
limits *validation.Overrides,
460520
ingestionReplicationFactor int,
461521
) (*Compactor, error) {
@@ -466,15 +526,17 @@ func newCompactor(
466526
compactorMetrics = newDefaultCompactorMetrics(registerer)
467527
}
468528
c := &Compactor{
469-
compactorCfg: compactorCfg,
470-
storageCfg: storageCfg,
471-
parentLogger: logger,
472-
logger: log.With(logger, "component", "compactor"),
473-
registerer: registerer,
474-
bucketClientFactory: bucketClientFactory,
475-
blocksGrouperFactory: blocksGrouperFactory,
476-
blocksCompactorFactory: blocksCompactorFactory,
477-
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
529+
compactorCfg: compactorCfg,
530+
storageCfg: storageCfg,
531+
parentLogger: logger,
532+
logger: log.With(logger, "component", "compactor"),
533+
registerer: registerer,
534+
bucketClientFactory: bucketClientFactory,
535+
blocksGrouperFactory: blocksGrouperFactory,
536+
blocksCompactorFactory: blocksCompactorFactory,
537+
blockDeletableCheckerFactory: blockDeletableCheckerFactory,
538+
compactionLifecycleCallbackFactory: compactionLifecycleCallbackFactory,
539+
allowedTenants: util.NewAllowedTenants(compactorCfg.EnabledTenants, compactorCfg.DisabledTenants),
478540

479541
CompactorStartDurationSeconds: promauto.With(registerer).NewGauge(prometheus.GaugeOpts{
480542
Name: "cortex_compactor_start_duration_seconds",
@@ -662,12 +724,6 @@ func (c *Compactor) starting(ctx context.Context) error {
662724
}, c.bucketClient, c.usersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
663725
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
664726

665-
// Ensure an initial cleanup occurred before starting the compactor.
666-
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
667-
c.ringSubservices.StopAsync()
668-
return errors.Wrap(err, "failed to start the blocks cleaner")
669-
}
670-
671727
if c.compactorCfg.CachingBucketEnabled {
672728
matchers := cortex_tsdb.NewMatchers()
673729
// Do not cache tenant deletion marker and block deletion marker for compactor
@@ -698,15 +754,30 @@ func (c *Compactor) stopping(_ error) error {
698754
}
699755

700756
func (c *Compactor) running(ctx context.Context) error {
757+
// Ensure an initial cleanup occurred as first thing when running compactor.
758+
if err := services.StartAndAwaitRunning(ctx, c.blocksCleaner); err != nil {
759+
c.ringSubservices.StopAsync()
760+
return errors.Wrap(err, "failed to start the blocks cleaner")
761+
}
762+
701763
// Run an initial compaction before starting the interval.
764+
// Insert jitter right before compaction starts to avoid multiple starting compactor to be in sync
765+
select {
766+
case <-ctx.Done():
767+
return ctx.Err()
768+
case <-time.After(time.Duration(rand.Int63n(int64(float64(c.compactorCfg.CompactionInterval) * 0.1)))):
769+
}
702770
c.compactUsers(ctx)
703771

704-
ticker := time.NewTicker(util.DurationWithJitter(c.compactorCfg.CompactionInterval, 0.05))
772+
ticker := time.NewTicker(c.compactorCfg.CompactionInterval)
705773
defer ticker.Stop()
706774

707775
for {
708776
select {
709777
case <-ticker.C:
778+
// Insert jitter right before compaction starts, so that there will always
779+
// have jitter even compaction time is longer than CompactionInterval
780+
time.Sleep(time.Duration(rand.Int63n(int64(float64(c.compactorCfg.CompactionInterval) * 0.1))))
710781
c.compactUsers(ctx)
711782
case <-ctx.Done():
712783
return nil
@@ -717,23 +788,19 @@ func (c *Compactor) running(ctx context.Context) error {
717788
}
718789

719790
func (c *Compactor) compactUsers(ctx context.Context) {
720-
failed := false
791+
succeeded := false
721792
interrupted := false
793+
compactionErrorCount := 0
722794

723795
c.CompactionRunsStarted.Inc()
724796

725797
defer func() {
726-
// interruptions and successful runs are considered
727-
// mutually exclusive but we consider a run failed if any
728-
// tenant runs failed even if later runs are interrupted
729-
if !interrupted && !failed {
798+
if succeeded && compactionErrorCount == 0 {
730799
c.CompactionRunsCompleted.Inc()
731800
c.CompactionRunsLastSuccess.SetToCurrentTime()
732-
}
733-
if interrupted {
801+
} else if interrupted {
734802
c.CompactionRunsInterrupted.Inc()
735-
}
736-
if failed {
803+
} else {
737804
c.CompactionRunsFailed.Inc()
738805
}
739806

@@ -747,7 +814,6 @@ func (c *Compactor) compactUsers(ctx context.Context) {
747814
level.Info(c.logger).Log("msg", "discovering users from bucket")
748815
users, err := c.discoverUsersWithRetries(ctx)
749816
if err != nil {
750-
failed = true
751817
level.Error(c.logger).Log("msg", "failed to discover users from bucket", "err", err)
752818
return
753819
}
@@ -816,7 +882,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
816882
}
817883

818884
c.CompactionRunFailedTenants.Inc()
819-
failed = true
885+
compactionErrorCount++
820886
level.Error(c.logger).Log("msg", "failed to compact user blocks", "user", userID, "err", err)
821887
continue
822888
}
@@ -851,6 +917,7 @@ func (c *Compactor) compactUsers(ctx context.Context) {
851917
}
852918
}
853919
}
920+
succeeded = true
854921
}
855922

856923
func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) error {
@@ -885,6 +952,11 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
885952
retries.Wait()
886953
}
887954

955+
err := errors.Unwrap(errors.Cause(lastErr))
956+
if errors.Is(err, plannerCompletedPartitionError) || errors.Is(err, plannerVisitedPartitionError) {
957+
return nil
958+
}
959+
888960
return lastErr
889961
}
890962

@@ -898,7 +970,12 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
898970

899971
// Filters out duplicate blocks that can be formed from two or more overlapping
900972
// blocks that fully submatches the source blocks of the older blocks.
901-
deduplicateBlocksFilter := block.NewDeduplicateFilter(c.compactorCfg.BlockSyncConcurrency)
973+
var deduplicateBlocksFilter CortexMetadataFilter
974+
if c.compactorCfg.ShardingStrategy == util.ShardingStrategyShuffle && c.compactorCfg.CompactionStrategy == util.CompactionStrategyPartitioning {
975+
deduplicateBlocksFilter = &disabledDeduplicateFilter{}
976+
} else {
977+
deduplicateBlocksFilter = block.NewDeduplicateFilter(c.compactorCfg.BlockSyncConcurrency)
978+
}
902979

903980
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
904981
// No delay is used -- all blocks with deletion marker are ignored, and not considered for compaction.
@@ -966,12 +1043,14 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
9661043

9671044
currentCtx, cancel := context.WithCancel(ctx)
9681045
defer cancel()
969-
compactor, err := compact.NewBucketCompactor(
1046+
compactor, err := compact.NewBucketCompactorWithCheckerAndCallback(
9701047
ulogger,
9711048
syncer,
9721049
c.blocksGrouperFactory(currentCtx, c.compactorCfg, bucket, ulogger, c.BlocksMarkedForNoCompaction, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, syncerMetrics, c.compactorMetrics, c.ring, c.ringLifecycler, c.limits, userID, noCompactMarkerFilter, c.ingestionReplicationFactor),
9731050
c.blocksPlannerFactory(currentCtx, bucket, ulogger, c.compactorCfg, noCompactMarkerFilter, c.ringLifecycler, userID, c.blockVisitMarkerReadFailed, c.blockVisitMarkerWriteFailed, c.compactorMetrics),
9741051
c.blocksCompactor,
1052+
c.blockDeletableCheckerFactory(currentCtx, bucket, ulogger),
1053+
c.compactionLifecycleCallbackFactory(currentCtx, bucket, ulogger, c.compactorCfg.MetaSyncConcurrency, c.compactDirForUser(userID), userID, c.compactorMetrics),
9751054
c.compactDirForUser(userID),
9761055
bucket,
9771056
c.compactorCfg.CompactionConcurrency,
@@ -982,6 +1061,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
9821061
}
9831062

9841063
if err := compactor.Compact(ctx); err != nil {
1064+
level.Warn(ulogger).Log("msg", "compaction failed with error", "err", err)
9851065
return errors.Wrap(err, "compaction")
9861066
}
9871067

@@ -1148,3 +1228,24 @@ func (c *Compactor) isPermissionDeniedErr(err error) bool {
11481228
}
11491229
return s.Code() == codes.PermissionDenied
11501230
}
1231+
1232+
type CortexMetadataFilter interface {
1233+
block.DeduplicateFilter
1234+
block.MetadataFilter
1235+
}
1236+
1237+
// disabledDeduplicateFilter is only used by Partitioning Compaction. Because Partitioning Compaction
1238+
// would always generate multiple result blocks (different partitions) for the same time range compaction.
1239+
// Those result blocks would always have same source blocks. Those result blocks should not be marked
1240+
// as duplicates when grouping for the next level of compaction. So DeduplicateFilter is disabled.
1241+
type disabledDeduplicateFilter struct {
1242+
}
1243+
1244+
func (f *disabledDeduplicateFilter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced block.GaugeVec, modified block.GaugeVec) error {
1245+
// don't do any deduplicate filtering
1246+
return nil
1247+
}
1248+
1249+
func (f *disabledDeduplicateFilter) DuplicateIDs() []ulid.ULID {
1250+
return nil
1251+
}

0 commit comments

Comments
 (0)