Skip to content

blocksconvert: add a mapping for tenant IDs #4740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716
* [BUGFIX] Distributor: Fix a memory leak in distributor due to the cluster label. #4739

## Blocksconvert

* [ENHANCEMENT] Add a mapping for tenant IDs from chunks to blocks. #4740

## 1.11.0 2021-11-25

* [CHANGE] Memberlist: Expose default configuration values to the command line options. Note that setting these explicitly to zero will no longer cause the default to be used. If the default is desired, then do set the option. The following are affected: #4276
Expand Down
3 changes: 3 additions & 0 deletions docs/blocks-storage/convert-stored-chunks-to-blocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Tools are:
All tools start HTTP server (see `-server.http*` options) exposing the `/metrics` endpoint.
All tools also start gRPC server (`-server.grpc*` options), but only Scheduler exposes services on it.

If you need to use different tenant IDs for blocks than you had for chunks, e.g. because you are moving data to a hosted provider, map them with `-builder.tenant-id-map`.

### Scanner

Scanner is started by running `blocksconvert -target=scanner`. Scanner requires configuration for accessing Cortex Index:
Expand Down Expand Up @@ -82,6 +84,7 @@ Builder is started by `blocksconvert -target=builder`. It needs to be configured
- `-gcs.bucketname` – when using GCS as chunks store (other chunks backend storages, like S3, are supported as well)
- `-blocks-storage.*` - blocks storage configuration
- `-builder.output-dir` - Local directory where Builder keeps the block while it is being built. Once block is uploaded to blocks storage, it is deleted from local directory.
- `-builder.tenant-id-map` - Path to file listing mapping of chunk tenant IDs to block tenant IDs. YAML map format. Note that an ID which is not mapped is an error; use `-scheduler.allowed-users` if you need to exclude some IDs from the transfer.

Multiple builders may run at the same time, each builder will receive different plan to work on from scheduler.
Builders are CPU intensive (decoding and merging chunks), and require fast disk IO for writing blocks.
Expand Down
34 changes: 32 additions & 2 deletions tools/blocksconvert/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
Expand All @@ -45,6 +46,8 @@ type Config struct {
SeriesBatchSize int
TimestampTolerance time.Duration

TenantIDMapFileName string

PlanProcessorConfig planprocessor.Config
}

Expand All @@ -58,6 +61,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.DeleteLocalBlock, "builder.delete-local-blocks", true, "Delete local files after uploading block.")
f.IntVar(&cfg.SeriesBatchSize, "builder.series-batch-size", defaultSeriesBatchSize, "Number of series to keep in memory before batch-write to temp file. Lower to decrease memory usage during the block building.")
f.DurationVar(&cfg.TimestampTolerance, "builder.timestamp-tolerance", 0, "Adjust sample timestamps by up to this to align them to an exact number of seconds apart.")
f.StringVar(&cfg.TenantIDMapFileName, "builder.tenant-id-map", "", "Path to file listing mapping of chunk tenant IDs to block tenant IDs")
}

func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg prometheus.Registerer) (services.Service, error) {
Expand All @@ -66,6 +70,21 @@ func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg p
return nil, errors.Wrap(err, "failed to load schema")
}

var tenantIDMap map[string]string
if cfg.TenantIDMapFileName != "" {
f, err := os.Open(cfg.TenantIDMapFileName)
if err != nil {
return nil, errors.Wrapf(err, "failed to open mapping file %q", cfg.TenantIDMapFileName)
}

decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
err = decoder.Decode(&tenantIDMap)
if err != nil {
return nil, errors.Wrapf(err, "reading mapping file")
}
}

bucketClient, err := scfg.GetBucket(l, reg)
if err != nil {
return nil, err
Expand All @@ -84,6 +103,7 @@ func NewBuilder(cfg Config, scfg blocksconvert.SharedConfig, l log.Logger, reg p
bucketClient: bucketClient,
schemaConfig: scfg.SchemaConfig,
storageConfig: scfg.StorageConfig,
tenantIDMap: tenantIDMap,

fetchedChunks: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_blocksconvert_builder_fetched_chunks_total",
Expand Down Expand Up @@ -129,6 +149,8 @@ type Builder struct {
schemaConfig chunk.SchemaConfig
storageConfig storage.Config

tenantIDMap map[string]string

fetchedChunks prometheus.Counter
fetchedChunksSize prometheus.Counter
processedSeries prometheus.Counter
Expand Down Expand Up @@ -198,6 +220,14 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c
return "", errors.Wrap(err, "failed to create chunk fetcher")
}

tenantID := p.userID
if p.builder.tenantIDMap != nil {
tenantID = p.builder.tenantIDMap[tenantID]
if tenantID == "" {
return "", errors.Errorf("tenant ID %q had no mapping", p.userID)
}
}

tsdbBuilder, err := newTsdbBuilder(p.builder.cfg.OutputDirectory, p.dayStart, p.dayEnd, p.builder.cfg.TimestampTolerance, p.builder.cfg.SeriesBatchSize, p.log,
p.builder.processedSeries, p.builder.writtenSamples, p.builder.seriesInMemory)
if err != nil {
Expand All @@ -217,7 +247,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c

// Finish block.
ulid, err := tsdbBuilder.finishBlock("blocksconvert", map[string]string{
cortex_tsdb.TenantIDExternalLabel: p.userID,
cortex_tsdb.TenantIDExternalLabel: tenantID,
})
if err != nil {
return "", errors.Wrap(err, "failed to finish block building")
Expand All @@ -234,7 +264,7 @@ func (p *builderProcessor) ProcessPlanEntries(ctx context.Context, planEntryCh c

if p.builder.cfg.UploadBlock {
// No per-tenant config provider because the blocksconvert tool doesn't support it.
userBucket := bucket.NewUserBucketClient(p.userID, p.builder.bucketClient, nil)
userBucket := bucket.NewUserBucketClient(tenantID, p.builder.bucketClient, nil)

err := uploadBlock(ctx, p.log, userBucket, blockDir)
if err != nil {
Expand Down