Skip to content

Commit 6dbc78c

Browse files
authored
Allow store gateway to ignore syncing blocks older than certain time (#6830)
* allow store gateway to ignore syncing blocks older than certain time Signed-off-by: Ben Ye <[email protected]> * changelog Signed-off-by: yeya24 <[email protected]> * fix handling default value 0 Signed-off-by: yeya24 <[email protected]> * fix Signed-off-by: yeya24 <[email protected]> * refactor code a bit Signed-off-by: yeya24 <[email protected]> * update unit test Signed-off-by: yeya24 <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]> Signed-off-by: yeya24 <[email protected]>
1 parent 5e6fe71 commit 6dbc78c

File tree

7 files changed

+107
-1
lines changed

7 files changed

+107
-1
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
* [ENHANCEMENT] Compactor: Optimize cleaner run time. #6815
4444
* [ENHANCEMENT] Parquet Storage: Allow percentage based dynamic shard size for Parquet Converter. #6817
4545
* [ENHANCEMENT] Query Frontend: Enhance the performance of the JSON codec. #6816
46+
* [ENHANCEMENT] Store Gateway: Allow to ignore syncing blocks older than certain time using `ignore_blocks_before`. #6830
4647
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
4748
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
4849
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576

docs/blocks-storage/querier.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1377,6 +1377,11 @@ blocks_storage:
13771377
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
13781378
[ignore_blocks_within: <duration> | default = 0s]
13791379

1380+
# The blocks created before `now() - ignore_blocks_before` will not be
1381+
# synced. 0 to disable.
1382+
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
1383+
[ignore_blocks_before: <duration> | default = 0s]
1384+
13801385
bucket_index:
13811386
# True to enable querier and store-gateway to discover blocks in the
13821387
# storage via bucket index instead of bucket scanning.

docs/blocks-storage/store-gateway.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,6 +1498,11 @@ blocks_storage:
14981498
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
14991499
[ignore_blocks_within: <duration> | default = 0s]
15001500

1501+
# The blocks created before `now() - ignore_blocks_before` will not be
1502+
# synced. 0 to disable.
1503+
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
1504+
[ignore_blocks_before: <duration> | default = 0s]
1505+
15011506
bucket_index:
15021507
# True to enable querier and store-gateway to discover blocks in the
15031508
# storage via bucket index instead of bucket scanning.

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1952,6 +1952,11 @@ bucket_store:
19521952
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-within
19531953
[ignore_blocks_within: <duration> | default = 0s]
19541954

1955+
# The blocks created before `now() - ignore_blocks_before` will not be synced.
1956+
# 0 to disable.
1957+
# CLI flag: -blocks-storage.bucket-store.ignore-blocks-before
1958+
[ignore_blocks_before: <duration> | default = 0s]
1959+
19551960
bucket_index:
19561961
# True to enable querier and store-gateway to discover blocks in the storage
19571962
# via bucket index instead of bucket scanning.

pkg/storage/tsdb/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ type BucketStoreConfig struct {
288288
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
289289
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
290290
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
291+
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
291292
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
292293
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
293294

@@ -364,6 +365,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
364365
"The idea of ignore-deletion-marks-delay is to ignore blocks that are marked for deletion with some delay. This ensures store can still serve blocks that are meant to be deleted but do not have a replacement yet. "+
365366
"Default is 6h, half of the default value for -compactor.deletion-delay.")
366367
f.DurationVar(&cfg.IgnoreBlocksWithin, "blocks-storage.bucket-store.ignore-blocks-within", 0, "The blocks created since `now() - ignore_blocks_within` will not be synced. This should be used together with `-querier.query-store-after` to filter out the blocks that are too new to be queried. A reasonable value for this flag would be `-querier.query-store-after - blocks-storage.bucket-store.bucket-index.max-stale-period` to give some buffer. 0 to disable.")
368+
f.DurationVar(&cfg.IgnoreBlocksBefore, "blocks-storage.bucket-store.ignore-blocks-before", 0, "The blocks created before `now() - ignore_blocks_before` will not be synced. 0 to disable.")
367369
f.IntVar(&cfg.PostingOffsetsInMemSampling, "blocks-storage.bucket-store.posting-offsets-in-mem-sampling", store.DefaultPostingOffsetInMemorySampling, "Controls what is the ratio of postings offsets that the store will hold in memory.")
368370
f.BoolVar(&cfg.IndexHeaderLazyLoadingEnabled, "blocks-storage.bucket-store.index-header-lazy-loading-enabled", false, "If enabled, store-gateway will lazily memory-map an index-header only once required by a query.")
369371
f.DurationVar(&cfg.IndexHeaderLazyLoadingIdleTimeout, "blocks-storage.bucket-store.index-header-lazy-loading-idle-timeout", 20*time.Minute, "If index-header lazy loading is enabled and this setting is > 0, the store-gateway will release memory-mapped index-headers after 'idle timeout' inactivity.")

pkg/storegateway/bucket_stores.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ import (
1515
"github.com/pkg/errors"
1616
"github.com/prometheus/client_golang/prometheus"
1717
"github.com/prometheus/client_golang/prometheus/promauto"
18+
"github.com/prometheus/common/model"
1819
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
1920
"github.com/thanos-io/objstore"
2021
"github.com/thanos-io/thanos/pkg/block"
2122
thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata"
2223
"github.com/thanos-io/thanos/pkg/extprom"
2324
"github.com/thanos-io/thanos/pkg/gate"
25+
thanos_model "github.com/thanos-io/thanos/pkg/model"
2426
"github.com/thanos-io/thanos/pkg/pool"
2527
"github.com/thanos-io/thanos/pkg/store"
2628
storecache "github.com/thanos-io/thanos/pkg/store/cache"
@@ -548,7 +550,20 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
548550
fetcherReg := prometheus.NewRegistry()
549551

550552
// The sharding strategy filter MUST be before the ones we create here (order matters).
551-
filters := append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{
553+
filters := []block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}
554+
555+
if u.cfg.BucketStore.IgnoreBlocksBefore > 0 {
556+
// We don't want to filter out any blocks for max time.
557+
// Set a positive duration so we can always load blocks till now.
558+
// IgnoreBlocksWithin
559+
filterMaxTimeDuration := model.Duration(time.Second)
560+
filterMinTime := thanos_model.TimeOrDurationValue{}
561+
ignoreBlocksBefore := -model.Duration(u.cfg.BucketStore.IgnoreBlocksBefore)
562+
filterMinTime.Dur = &ignoreBlocksBefore
563+
filters = append(filters, block.NewTimePartitionMetaFilter(filterMinTime, thanos_model.TimeOrDurationValue{Dur: &filterMaxTimeDuration}))
564+
}
565+
566+
filters = append(filters, []block.MetadataFilter{
552567
block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg),
553568
// Use our own custom implementation.
554569
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay, u.cfg.BucketStore.MetaSyncConcurrency),

pkg/storegateway/bucket_stores_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,79 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl
571571
assert.Equal(t, 1, len(series))
572572
}
573573

574+
func TestBucketStores_SyncBlocksWithIgnoreBlocksBefore(t *testing.T) {
575+
t.Parallel()
576+
577+
const userID = "user-1"
578+
const metricName = "test_metric"
579+
580+
ctx := context.Background()
581+
cfg := prepareStorageConfig(t)
582+
583+
// Configure IgnoreBlocksBefore to filter out blocks older than 2 hours
584+
cfg.BucketStore.IgnoreBlocksBefore = 2 * time.Hour
585+
586+
storageDir := t.TempDir()
587+
588+
// Create blocks with different timestamps
589+
now := time.Now()
590+
591+
// Block 1: Very old block (should be ignored - time-excluded)
592+
oldBlockTime := now.Add(-5 * time.Hour)
593+
generateStorageBlock(t, storageDir, userID, metricName+"_old",
594+
oldBlockTime.UnixMilli(), oldBlockTime.Add(time.Hour).UnixMilli(), 15)
595+
596+
// Block 2: Recent block (should be synced)
597+
recentBlockTime := now.Add(-1 * time.Hour)
598+
generateStorageBlock(t, storageDir, userID, metricName+"_recent",
599+
recentBlockTime.UnixMilli(), recentBlockTime.Add(time.Hour).UnixMilli(), 15)
600+
601+
// Block 3: Current block (should be synced)
602+
currentBlockTime := now.Add(-30 * time.Minute)
603+
generateStorageBlock(t, storageDir, userID, metricName+"_current",
604+
currentBlockTime.UnixMilli(), currentBlockTime.Add(time.Hour).UnixMilli(), 15)
605+
606+
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
607+
require.NoError(t, err)
608+
609+
reg := prometheus.NewPedanticRegistry()
610+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil),
611+
objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
612+
require.NoError(t, err)
613+
614+
// Perform initial sync
615+
require.NoError(t, stores.InitialSync(ctx))
616+
617+
// Verify that only recent and current blocks are loaded
618+
// The old block should be filtered out by IgnoreBlocksBefore (time-excluded)
619+
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
620+
# HELP cortex_blocks_meta_synced Reflects current state of synced blocks (over all tenants).
621+
# TYPE cortex_blocks_meta_synced gauge
622+
cortex_blocks_meta_synced{state="corrupted-meta-json"} 0
623+
cortex_blocks_meta_synced{state="duplicate"} 0
624+
cortex_blocks_meta_synced{state="failed"} 0
625+
cortex_blocks_meta_synced{state="label-excluded"} 0
626+
cortex_blocks_meta_synced{state="loaded"} 2
627+
cortex_blocks_meta_synced{state="marked-for-deletion"} 0
628+
cortex_blocks_meta_synced{state="marked-for-no-compact"} 0
629+
cortex_blocks_meta_synced{state="no-meta-json"} 0
630+
cortex_blocks_meta_synced{state="time-excluded"} 1
631+
cortex_blocks_meta_synced{state="too-fresh"} 0
632+
# HELP cortex_blocks_meta_syncs_total Total blocks metadata synchronization attempts
633+
# TYPE cortex_blocks_meta_syncs_total counter
634+
cortex_blocks_meta_syncs_total 3
635+
# HELP cortex_bucket_store_blocks_meta_sync_failures_total Total blocks metadata synchronization failures
636+
# TYPE cortex_bucket_store_blocks_meta_sync_failures_total counter
637+
cortex_bucket_store_blocks_meta_sync_failures_total 0
638+
# HELP cortex_bucket_store_block_loads_total Total number of remote block loading attempts.
639+
# TYPE cortex_bucket_store_block_loads_total counter
640+
cortex_bucket_store_block_loads_total 2
641+
# HELP cortex_bucket_store_blocks_loaded Number of currently loaded blocks.
642+
# TYPE cortex_bucket_store_blocks_loaded gauge
643+
cortex_bucket_store_blocks_loaded{user="user-1"} 2
644+
`), "cortex_bucket_store_block_loads_total", "cortex_bucket_store_blocks_loaded", "cortex_blocks_meta_synced"))
645+
}
646+
574647
func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig {
575648
cfg := cortex_tsdb.BlocksStorageConfig{}
576649
flagext.DefaultValues(&cfg)

0 commit comments

Comments
 (0)