Skip to content

Commit 8e3674d

Browse files
authored
Add parquet convert failure, block delay and total block to convert metrics (#6821)
* add parquet converter failures, block delay and total unconverted block metrics Signed-off-by: yeya24 <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> * address comment Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: yeya24 <[email protected]>
1 parent 826e32a commit 8e3674d

File tree

11 files changed

+417
-52
lines changed

11 files changed

+417
-52
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
* [ENHANCEMENT] Compactor: Support metadata caching bucket for Cleaner. Can be enabled via `-compactor.cleaner-caching-bucket-enabled` flag. #6778
4040
* [ENHANCEMENT] Compactor, Store Gateway: Introduce user scanner strategy and user index. #6780
4141
* [ENHANCEMENT] Querier: Support chunks cache for parquet queryable. #6805
42-
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809
42+
* [ENHANCEMENT] Parquet Storage: Add some metrics for parquet blocks and converter. #6809 #6821
4343
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
4444
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
4545
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517

integration/parquet_querier_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ func TestParquetFuzz(t *testing.T) {
4141
flags := mergeFlags(
4242
baseFlags,
4343
map[string]string{
44-
"-target": "all,parquet-converter",
45-
"-blocks-storage.tsdb.ship-interval": "1s",
46-
"-blocks-storage.bucket-store.sync-interval": "1s",
44+
"-target": "all,parquet-converter",
45+
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
46+
"-blocks-storage.tsdb.ship-interval": "1s",
47+
"-blocks-storage.bucket-store.sync-interval": "1s",
4748
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
4849
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
4950
"-blocks-storage.bucket-store.bucket-index.enabled": "true",

pkg/compactor/blocks_cleaner.go

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"go.uber.org/atomic"
2020

2121
"github.com/cortexproject/cortex/pkg/storage/bucket"
22+
cortex_parquet "github.com/cortexproject/cortex/pkg/storage/parquet"
2223
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2324
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2425
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
@@ -43,6 +44,7 @@ type BlocksCleanerConfig struct {
4344
TenantCleanupDelay time.Duration // Delay before removing tenant deletion mark and "debug".
4445
ShardingStrategy string
4546
CompactionStrategy string
47+
BlockRanges []int64
4648
}
4749

4850
type BlocksCleaner struct {
@@ -73,6 +75,7 @@ type BlocksCleaner struct {
7375
blocksMarkedForDeletion *prometheus.CounterVec
7476
tenantBlocks *prometheus.GaugeVec
7577
tenantParquetBlocks *prometheus.GaugeVec
78+
tenantParquetUnConvertedBlocks *prometheus.GaugeVec
7679
tenantBlocksMarkedForDelete *prometheus.GaugeVec
7780
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
7881
tenantPartialBlocks *prometheus.GaugeVec
@@ -160,6 +163,10 @@ func NewBlocksCleaner(
160163
Name: "cortex_bucket_parquet_blocks_count",
161164
Help: "Total number of parquet blocks in the bucket. Blocks marked for deletion are included.",
162165
}, commonLabels),
166+
tenantParquetUnConvertedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
167+
Name: "cortex_bucket_parquet_unconverted_blocks_count",
168+
Help: "Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.",
169+
}, commonLabels),
163170
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
164171
Name: "cortex_bucket_blocks_marked_for_deletion_count",
165172
Help: "Total number of blocks marked for deletion in the bucket.",
@@ -377,6 +384,7 @@ func (c *BlocksCleaner) scanUsers(ctx context.Context) ([]string, []string, erro
377384
if !isActive[userID] && !isMarkedForDeletion[userID] {
378385
c.tenantBlocks.DeleteLabelValues(userID)
379386
c.tenantParquetBlocks.DeleteLabelValues(userID)
387+
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
380388
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
381389
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
382390
c.tenantPartialBlocks.DeleteLabelValues(userID)
@@ -483,6 +491,7 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
483491
// Given all blocks have been deleted, we can also remove the metrics.
484492
c.tenantBlocks.DeleteLabelValues(userID)
485493
c.tenantParquetBlocks.DeleteLabelValues(userID)
494+
c.tenantParquetUnConvertedBlocks.DeleteLabelValues(userID)
486495
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
487496
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
488497
c.tenantPartialBlocks.DeleteLabelValues(userID)
@@ -708,14 +717,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
708717
}
709718
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
710719
}
711-
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
712-
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
713-
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
714-
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
715-
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
716-
if parquetEnabled {
717-
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
718-
}
720+
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))
719721

720722
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
721723
begin = time.Now()
@@ -725,6 +727,24 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
725727
return nil
726728
}
727729

730+
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) {
731+
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
732+
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
733+
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
734+
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(partials))
735+
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
736+
if parquetEnabled {
737+
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
738+
remainingBlocksToConvert := 0
739+
for _, b := range idx.NonParquetBlocks() {
740+
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) {
741+
remainingBlocksToConvert++
742+
}
743+
}
744+
c.tenantParquetUnConvertedBlocks.WithLabelValues(userID).Set(float64(remainingBlocksToConvert))
745+
}
746+
}
747+
728748
func (c *BlocksCleaner) cleanPartitionedGroupInfo(ctx context.Context, userBucket objstore.InstrumentedBucket, userLogger log.Logger, userID string) {
729749
existentPartitionedGroupInfo := make(map[*PartitionedGroupInfo]struct {
730750
path string

pkg/compactor/blocks_cleaner_test.go

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8080
DeletionDelay: deletionDelay,
8181
CleanupInterval: time.Minute,
8282
CleanupConcurrency: 1,
83+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
8384
}
8485

8586
logger := log.NewNopLogger()
@@ -182,6 +183,8 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
182183

183184
// Create Parquet marker
184185
block13 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
186+
// This block should be converted to Parquet format so counted as remaining.
187+
block14 := createTSDBBlock(t, bucketClient, "user-6", 30, 50, nil)
185188
createParquetMarker(t, bucketClient, "user-6", block13)
186189

187190
// The fixtures have been created. If the bucket client wasn't wrapped to write
@@ -196,6 +199,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
196199
CleanupConcurrency: options.concurrency,
197200
BlockDeletionMarksMigrationEnabled: options.markersMigrationEnabled,
198201
TenantCleanupDelay: options.tenantDeletionDelay,
202+
BlockRanges: (&tsdb.DurationList{2 * time.Hour}).ToMilliseconds(),
199203
}
200204

201205
reg := prometheus.NewPedanticRegistry()
@@ -251,6 +255,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
251255
{path: path.Join("user-3", block10.String(), parquet.ConverterMarkerFileName), expectedExists: false},
252256
{path: path.Join("user-4", block.DebugMetas, "meta.json"), expectedExists: options.user4FilesExist},
253257
{path: path.Join("user-6", block13.String(), parquet.ConverterMarkerFileName), expectedExists: true},
258+
{path: path.Join("user-6", block14.String(), parquet.ConverterMarkerFileName), expectedExists: false},
254259
} {
255260
exists, err := bucketClient.Exists(ctx, tc.path)
256261
require.NoError(t, err)
@@ -296,6 +301,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
296301
}, {
297302
userID: "user-3",
298303
expectedIndex: false,
304+
}, {
305+
userID: "user-6",
306+
expectedIndex: true,
307+
expectedBlocks: []ulid.ULID{block13, block14},
308+
expectedMarks: []ulid.ULID{},
299309
},
300310
} {
301311
idx, err := bucketindex.ReadIndex(ctx, bucketClient, tc.userID, nil, logger)
@@ -318,7 +328,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
318328
cortex_bucket_blocks_count{user="user-1"} 2
319329
cortex_bucket_blocks_count{user="user-2"} 1
320330
cortex_bucket_blocks_count{user="user-5"} 2
321-
cortex_bucket_blocks_count{user="user-6"} 1
331+
cortex_bucket_blocks_count{user="user-6"} 2
322332
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
323333
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
324334
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
@@ -341,9 +351,14 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
341351
# TYPE cortex_bucket_parquet_blocks_count gauge
342352
cortex_bucket_parquet_blocks_count{user="user-5"} 0
343353
cortex_bucket_parquet_blocks_count{user="user-6"} 1
354+
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
355+
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
356+
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 0
357+
cortex_bucket_parquet_unconverted_blocks_count{user="user-6"} 0
344358
`),
345359
"cortex_bucket_blocks_count",
346360
"cortex_bucket_parquet_blocks_count",
361+
"cortex_bucket_parquet_unconverted_blocks_count",
347362
"cortex_bucket_blocks_marked_for_deletion_count",
348363
"cortex_bucket_blocks_marked_for_no_compaction_count",
349364
"cortex_bucket_blocks_partials_count",
@@ -378,6 +393,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) {
378393
DeletionDelay: deletionDelay,
379394
CleanupInterval: time.Minute,
380395
CleanupConcurrency: 1,
396+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
381397
}
382398

383399
logger := log.NewNopLogger()
@@ -447,6 +463,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
447463
DeletionDelay: deletionDelay,
448464
CleanupInterval: time.Minute,
449465
CleanupConcurrency: 1,
466+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
450467
}
451468

452469
logger := log.NewNopLogger()
@@ -508,6 +525,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar
508525
DeletionDelay: time.Hour,
509526
CleanupInterval: time.Minute,
510527
CleanupConcurrency: 1,
528+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
511529
}
512530

513531
ctx := context.Background()
@@ -657,6 +675,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) {
657675
DeletionDelay: time.Hour,
658676
CleanupInterval: time.Minute,
659677
CleanupConcurrency: 1,
678+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
660679
}
661680

662681
ctx := context.Background()
@@ -889,6 +908,7 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
889908
CleanupConcurrency: 1,
890909
ShardingStrategy: util.ShardingStrategyShuffle,
891910
CompactionStrategy: util.CompactionStrategyPartitioning,
911+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
892912
}
893913

894914
ctx := context.Background()
@@ -964,6 +984,7 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
964984
CleanupConcurrency: 1,
965985
ShardingStrategy: util.ShardingStrategyShuffle,
966986
CompactionStrategy: util.CompactionStrategyPartitioning,
987+
BlockRanges: (&tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}).ToMilliseconds(),
967988
}
968989

969990
ctx := context.Background()
@@ -1021,6 +1042,91 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
10211042
require.True(t, userBucket.IsObjNotFoundErr(err))
10221043
}
10231044

1045+
func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
1046+
// Create metrics
1047+
reg := prometheus.NewPedanticRegistry()
1048+
blocksMarkedForDeletion := promauto.With(reg).NewCounterVec(
1049+
prometheus.CounterOpts{
1050+
Name: "cortex_compactor_blocks_marked_for_deletion_total",
1051+
Help: "Total number of blocks marked for deletion in compactor.",
1052+
},
1053+
[]string{"user", "reason"},
1054+
)
1055+
remainingPlannedCompactions := promauto.With(reg).NewGaugeVec(
1056+
prometheus.GaugeOpts{
1057+
Name: "cortex_compactor_remaining_planned_compactions",
1058+
Help: "Total number of remaining planned compactions.",
1059+
},
1060+
[]string{"user"},
1061+
)
1062+
1063+
// Create the blocks cleaner
1064+
cleaner := NewBlocksCleaner(
1065+
BlocksCleanerConfig{
1066+
BlockRanges: (&tsdb.DurationList{
1067+
2 * time.Hour,
1068+
12 * time.Hour,
1069+
}).ToMilliseconds(),
1070+
},
1071+
nil, // bucket not needed
1072+
nil, // usersScanner not needed
1073+
0,
1074+
&mockConfigProvider{
1075+
parquetConverterEnabled: map[string]bool{
1076+
"user1": true,
1077+
},
1078+
},
1079+
log.NewNopLogger(),
1080+
"test",
1081+
reg,
1082+
0,
1083+
0,
1084+
blocksMarkedForDeletion,
1085+
remainingPlannedCompactions,
1086+
)
1087+
1088+
// Create test blocks in the index
1089+
now := time.Now()
1090+
idx := &bucketindex.Index{
1091+
Blocks: bucketindex.Blocks{
1092+
{
1093+
ID: ulid.MustNew(ulid.Now(), rand.Reader),
1094+
MinTime: now.Add(-3 * time.Hour).UnixMilli(),
1095+
MaxTime: now.UnixMilli(),
1096+
Parquet: &parquet.ConverterMarkMeta{},
1097+
},
1098+
{
1099+
ID: ulid.MustNew(ulid.Now(), rand.Reader),
1100+
MinTime: now.Add(-3 * time.Hour).UnixMilli(),
1101+
MaxTime: now.UnixMilli(),
1102+
Parquet: nil,
1103+
},
1104+
{
1105+
ID: ulid.MustNew(ulid.Now(), rand.Reader),
1106+
MinTime: now.Add(-5 * time.Hour).UnixMilli(),
1107+
MaxTime: now.UnixMilli(),
1108+
Parquet: nil,
1109+
},
1110+
},
1111+
}
1112+
1113+
// Update metrics
1114+
cleaner.updateBucketMetrics("user1", true, idx, 0, 0)
1115+
1116+
// Verify metrics
1117+
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(`
1118+
# HELP cortex_bucket_parquet_blocks_count Total number of parquet blocks in the bucket. Blocks marked for deletion are included.
1119+
# TYPE cortex_bucket_parquet_blocks_count gauge
1120+
cortex_bucket_parquet_blocks_count{user="user1"} 1
1121+
`)))
1122+
1123+
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetUnConvertedBlocks, strings.NewReader(`
1124+
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
1125+
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
1126+
cortex_bucket_parquet_unconverted_blocks_count{user="user1"} 2
1127+
`)))
1128+
}
1129+
10241130
type mockConfigProvider struct {
10251131
userRetentionPeriods map[string]time.Duration
10261132
parquetConverterEnabled map[string]bool

pkg/compactor/compactor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,7 @@ func (c *Compactor) starting(ctx context.Context) error {
752752
TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay,
753753
ShardingStrategy: c.compactorCfg.ShardingStrategy,
754754
CompactionStrategy: c.compactorCfg.CompactionStrategy,
755+
BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(),
755756
}, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
756757
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
757758

0 commit comments

Comments
 (0)