diff --git a/CHANGELOG.md b/CHANGELOG.md index 61159e50f13..b66ce9e7f06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,10 @@ * [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716 * [BUGFIX] Distributor: Fix a memory leak in distributor due to the cluster label. #4739 +## Blocksconvert + +* [ENHANCEMENT] Add a mapping for tenant IDs from chunks to blocks. #4740 + ## 1.11.0 2021-11-25 * [CHANGE] Memberlist: Expose default configuration values to the command line options. Note that setting these explicitly to zero will no longer cause the default to be used. If the default is desired, then do set the option. The following are affected: #4276 diff --git a/docs/blocks-storage/convert-stored-chunks-to-blocks.md b/docs/blocks-storage/convert-stored-chunks-to-blocks.md index fc6f98e16a8..673bad92bb0 100644 --- a/docs/blocks-storage/convert-stored-chunks-to-blocks.md +++ b/docs/blocks-storage/convert-stored-chunks-to-blocks.md @@ -29,6 +29,8 @@ Tools are: All tools start HTTP server (see `-server.http*` options) exposing the `/metrics` endpoint. All tools also start gRPC server (`-server.grpc*` options), but only Scheduler exposes services on it. +If you need to use different tenant IDs for blocks than you had for chunks, e.g. because you are moving data to a hosted provider, map them with `-builder.tenant-id-map`. + ### Scanner Scanner is started by running `blocksconvert -target=scanner`. Scanner requires configuration for accessing Cortex Index: @@ -82,6 +84,7 @@ Builder is started by `blocksconvert -target=builder`. It needs to be configured - `-gcs.bucketname` – when using GCS as chunks store (other chunks backend storages, like S3, are supported as well) - `-blocks-storage.*` - blocks storage configuration - `-builder.output-dir` - Local directory where Builder keeps the block while it is being built. Once block is uploaded to blocks storage, it is deleted from local directory. +- `-builder.tenant-id-map` - Path to file listing mapping of chunk tenant IDs to block tenant IDs. YAML map format. Note that an ID which is not mapped is an error; use `-scheduler.allowed-users` if you need to exclude some IDs from the transfer. Multiple builders may run at the same time, each builder will receive different plan to work on from scheduler. Builders are CPU intensive (decoding and merging chunks), and require fast disk IO for writing blocks. diff --git a/tools/blocksconvert/builder/builder.go b/tools/blocksconvert/builder/builder.go index 077f42036ad..aba2e63579d 100644 --- a/tools/blocksconvert/builder/builder.go +++ b/tools/blocksconvert/builder/builder.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "golang.org/x/sync/errgroup" + "gopkg.in/yaml.v2" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -45,6 +46,8 @@ type Config struct { SeriesBatchSize int TimestampTolerance time.Duration + TenantIDMapFileName string + PlanProcessorConfig planprocessor.Config } @@ -58,6 +61,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.DeleteLocalBlock, "builder.delete-local-blocks", true, "Delete local files after uploading block.") f.IntVar(&cfg.SeriesBatchSize, "builder.series-batch-size", defaultSeriesBatchSize, "Number of series to keep in memory before batch-write to temp file. Lower to decrease memory usage during the block building.") f.DurationVar(&cfg.TimestampTolerance, "builder.timestamp-tolerance", 0, "Adjust sample timestamps by up to this to align them to an exact number of seconds apart.") + f.StringVar(&cfg.TenantIDMapFileName, "builder.tenant-id-map", "", "Path to file listing mapping of chunk tenant IDs to block tenant IDs") } func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg prometheus.Registerer) (services.Service, error) { @@ -66,6 +70,21 @@ func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg p return nil, errors.Wrap(err, "failed to load schema") } + var tenantIDMap map[string]string + if cfg.TenantIDMapFileName != "" { + f, err := os.Open(cfg.TenantIDMapFileName) + if err != nil { + return nil, errors.Wrapf(err, "failed to open mapping file %q", cfg.TenantIDMapFileName) + } + + decoder := yaml.NewDecoder(f) + decoder.SetStrict(true) + err = decoder.Decode(&tenantIDMap) + if err != nil { + return nil, errors.Wrapf(err, "reading mapping file") + } + } + bucketClient, err := scfg.GetBucket(l, reg) if err != nil { return nil, err @@ -84,6 +103,7 @@ func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg p bucketClient: bucketClient, schemaConfig: scfg.SchemaConfig, storageConfig: scfg.StorageConfig, + tenantIDMap: tenantIDMap, fetchedChunks: promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "cortex_blocksconvert_builder_fetched_chunks_total", @@ -129,6 +149,8 @@ type Builder struct { schemaConfig chunk.SchemaConfig storageConfig storage.Config + tenantIDMap map[string]string + fetchedChunks prometheus.Counter fetchedChunksSize prometheus.Counter processedSeries prometheus.Counter @@ -198,6 +220,14 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c return "", errors.Wrap(err, "failed to create chunk fetcher") } + tenantID := p.userID + if p.builder.tenantIDMap != nil { + tenantID = p.builder.tenantIDMap[tenantID] + if tenantID == "" { + return "", errors.Errorf("tenant ID %q had no mapping", p.userID) + } + } + tsdbBuilder, err := newTsdbBuilder(p.builder.cfg.OutputDirectory, p.dayStart, p.dayEnd, p.builder.cfg.TimestampTolerance, p.builder.cfg.SeriesBatchSize, p.log, p.builder.processedSeries, p.builder.writtenSamples, p.builder.seriesInMemory) if err != nil { @@ -217,7 +247,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c // Finish block. ulid, err := tsdbBuilder.finishBlock("blocksconvert", map[string]string{ - cortex_tsdb.TenantIDExternalLabel: p.userID, + cortex_tsdb.TenantIDExternalLabel: tenantID, }) if err != nil { return "", errors.Wrap(err, "failed to finish block building") @@ -234,7 +264,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c if p.builder.cfg.UploadBlock { // No per-tenant config provider because the blocksconvert tool doesn't support it. - userBucket := bucket.NewUserBucketClient(p.userID, p.builder.bucketClient, nil) + userBucket := bucket.NewUserBucketClient(tenantID, p.builder.bucketClient, nil) err := uploadBlock(ctx, p.log, userBucket, blockDir) if err != nil {