Skip to content

add parquet labels cache #6835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion integration/parquet_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand All @@ -35,7 +36,8 @@ func TestParquetFuzz(t *testing.T) {
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
Expand Down Expand Up @@ -72,6 +74,11 @@ func TestParquetFuzz(t *testing.T) {
"-parquet-converter.enabled": "true",
// Querier
"-querier.enable-parquet-queryable": "true",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func createCachingBucketClient(ctx context.Context, storageCfg cortex_tsdb.Block

// Blocks finder doesn't use chunks, but we pass config for consistency.
matchers := cortex_tsdb.NewMatchers()
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
cachingBucket, err := cortex_tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": name}, reg))
if err != nil {
return nil, errors.Wrap(err, "create caching bucket")
}
Expand Down
55 changes: 54 additions & 1 deletion pkg/storage/tsdb/caching_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,39 @@ func (cfg *MetadataCacheConfig) Validate() error {
return cfg.BucketCacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
type ParquetLabelsCacheConfig struct {
BucketCacheBackend `yaml:",inline"`

SubrangeSize int64 `yaml:"subrange_size"`
MaxGetRangeRequests int `yaml:"max_get_range_requests"`
AttributesTTL time.Duration `yaml:"attributes_ttl"`
SubrangeTTL time.Duration `yaml:"subrange_ttl"`
}

func (cfg *ParquetLabelsCacheConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Backend, prefix+"backend", "", fmt.Sprintf("The parquet labels cache backend type. Single or Multiple cache backend can be provided. "+
"Supported values in single cache: %s, %s, %s, and '' (disable). "+
"Supported values in multi level cache: a comma-separated list of (%s)", CacheBackendMemcached, CacheBackendRedis, CacheBackendInMemory, strings.Join(supportedBucketCacheBackends, ", ")))

cfg.Memcached.RegisterFlagsWithPrefix(f, prefix+"memcached.")
cfg.Redis.RegisterFlagsWithPrefix(f, prefix+"redis.")
cfg.InMemory.RegisterFlagsWithPrefix(f, prefix+"inmemory.", "parquet-labels")
cfg.MultiLevel.RegisterFlagsWithPrefix(f, prefix+"multilevel.")

f.Int64Var(&cfg.SubrangeSize, prefix+"subrange-size", 16000, "Size of each subrange that bucket object is split into for better caching.")
f.IntVar(&cfg.MaxGetRangeRequests, prefix+"max-get-range-requests", 3, "Maximum number of sub-GetRange requests that a single GetRange request can be split into when fetching parquet labels file. Zero or negative value = unlimited number of sub-requests.")
f.DurationVar(&cfg.AttributesTTL, prefix+"attributes-ttl", 168*time.Hour, "TTL for caching object attributes for parquet labels file.")
f.DurationVar(&cfg.SubrangeTTL, prefix+"subrange-ttl", 24*time.Hour, "TTL for caching individual subranges.")

// In the multi level parquet labels cache, backfill TTL follows subrange TTL
cfg.MultiLevel.BackFillTTL = cfg.SubrangeTTL
}

func (cfg *ParquetLabelsCacheConfig) Validate() error {
return cfg.BucketCacheBackend.Validate()
}

func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig MetadataCacheConfig, parquetLabelsConfig ParquetLabelsCacheConfig, matchers Matchers, bkt objstore.InstrumentedBucket, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
cfg := cache.NewCachingBucketConfig()
cachingConfigured := false

Expand Down Expand Up @@ -221,6 +253,16 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata
cfg.CacheIter("chunks-iter", metadataCache, matchers.GetChunksIterMatcher(), metadataConfig.ChunksListTTL, codec, "")
}

parquetLabelsCache, err := createBucketCache("parquet-labels-cache", &parquetLabelsConfig.BucketCacheBackend, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "parquet-labels-cache")
}
if parquetLabelsCache != nil {
cachingConfigured = true
parquetLabelsCache = cache.NewTracingCache(parquetLabelsCache)
cfg.CacheGetRange("parquet-labels", parquetLabelsCache, matchers.GetParquetLabelsMatcher(), parquetLabelsConfig.SubrangeSize, parquetLabelsConfig.AttributesTTL, parquetLabelsConfig.SubrangeTTL, parquetLabelsConfig.MaxGetRangeRequests)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For both, this and the chunks cache, should we cache also the attributes?

}

if !cachingConfigured {
// No caching is configured.
return bkt, nil
Expand Down Expand Up @@ -316,6 +358,7 @@ func NewMatchers() Matchers {
matcherMap := make(map[string]func(string) bool)
matcherMap["chunks"] = isTSDBChunkFile
matcherMap["parquet-chunks"] = isParquetChunkFile
matcherMap["parquet-labels"] = isParquetLabelsFile
matcherMap["metafile"] = isMetaFile
matcherMap["block-index"] = isBlockIndexFile
matcherMap["bucket-index"] = isBucketIndexFiles
Expand All @@ -339,6 +382,10 @@ func (m *Matchers) SetParquetChunksMatcher(f func(string) bool) {
m.matcherMap["parquet-chunks"] = f
}

func (m *Matchers) SetParquetLabelsMatcher(f func(string) bool) {
m.matcherMap["parquet-labels"] = f
}

func (m *Matchers) SetBlockIndexMatcher(f func(string) bool) {
m.matcherMap["block-index"] = f
}
Expand Down Expand Up @@ -367,6 +414,10 @@ func (m *Matchers) GetParquetChunksMatcher() func(string) bool {
return m.matcherMap["parquet-chunks"]
}

func (m *Matchers) GetParquetLabelsMatcher() func(string) bool {
return m.matcherMap["parquet-labels"]
}

func (m *Matchers) GetMetafileMatcher() func(string) bool {
return m.matcherMap["metafile"]
}
Expand Down Expand Up @@ -397,6 +448,8 @@ func isTSDBChunkFile(name string) bool { return chunksMatcher.MatchString(name)

func isParquetChunkFile(name string) bool { return strings.HasSuffix(name, "chunks.parquet") }

func isParquetLabelsFile(name string) bool { return strings.HasSuffix(name, "labels.parquet") }

func isMetaFile(name string) bool {
return strings.HasSuffix(name, "/"+metadata.MetaFilename) || strings.HasSuffix(name, "/"+metadata.DeletionMarkFilename) || strings.HasSuffix(name, "/"+TenantDeletionMarkFile)
}
Expand Down
40 changes: 23 additions & 17 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,24 @@ func (cfg *TSDBConfig) IsBlocksShippingEnabled() bool {

// BucketStoreConfig holds the config information for Bucket Stores used by the querier and store-gateway.
type BucketStoreConfig struct {
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConsistencyDelay time.Duration `yaml:"consistency_delay"`
IndexCache IndexCacheConfig `yaml:"index_cache"`
ChunksCache ChunksCacheConfig `yaml:"chunks_cache"`
MetadataCache MetadataCacheConfig `yaml:"metadata_cache"`
ParquetLabelsCache ParquetLabelsCacheConfig `yaml:"parquet_labels_cache" doc:"hidden"`
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
IgnoreDeletionMarksDelay time.Duration `yaml:"ignore_deletion_mark_delay"`
IgnoreBlocksWithin time.Duration `yaml:"ignore_blocks_within"`
IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"`
BucketIndex BucketIndexConfig `yaml:"bucket_index"`
BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"`

// Chunk pool.
MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"`
Expand Down Expand Up @@ -348,6 +349,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
cfg.IndexCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.index-cache.")
cfg.ChunksCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.chunks-cache.")
cfg.MetadataCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.metadata-cache.")
cfg.ParquetLabelsCache.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.parquet-labels-cache.")
cfg.BucketIndex.RegisterFlagsWithPrefix(f, "blocks-storage.bucket-store.bucket-index.")

f.StringVar(&cfg.SyncDir, "blocks-storage.bucket-store.sync-dir", "tsdb-sync", "Directory to store synchronized TSDB index headers.")
Expand Down Expand Up @@ -403,6 +405,10 @@ func (cfg *BucketStoreConfig) Validate() error {
if err != nil {
return errors.Wrap(err, "metadata-cache configuration")
}
err = cfg.ParquetLabelsCache.Validate()
if err != nil {
return errors.Wrap(err, "parquet-labels-cache configuration")
}
if !util.StringsContain(supportedBlockDiscoveryStrategies, cfg.BlockDiscoveryStrategy) {
return ErrInvalidBucketIndexBlockDiscoveryStrategy
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/tsdb/multilevel_bucket_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ func newMultiLevelBucketCache(name string, cfg MultiLevelBucketCacheConfig, reg
case "metadata-cache":
itemName = "metadata_cache"
metricHelpText = "metadata cache"
case "parquet-labels-cache":
itemName = "parquet_labels_cache"
metricHelpText = "parquet labels cache"
default:
itemName = name
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many
// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
matchers := tsdb.NewMatchers()
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, matchers, bucketClient, logger, reg)
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg)
if err != nil {
return nil, errors.Wrapf(err, "create caching bucket")
}
Expand Down