Skip to content

Commit 61fe286

Browse files
authored
Do not count 404 errors when checking if tenant deletion markers exists (#5698)
Signed-off-by: Alan Protasio <[email protected]>
1 parent 23294aa commit 61fe286

19 files changed

+57
-47
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type BlocksCleaner struct {
4343
cfg BlocksCleanerConfig
4444
cfgProvider ConfigProvider
4545
logger log.Logger
46-
bucketClient objstore.Bucket
46+
bucketClient objstore.InstrumentedBucket
4747
usersScanner *cortex_tsdb.UsersScanner
4848

4949
// Keep track of the last owned users.
@@ -64,7 +64,7 @@ type BlocksCleaner struct {
6464
tenantBucketIndexLastUpdate *prometheus.GaugeVec
6565
}
6666

67-
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
67+
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.InstrumentedBucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
6868
c := &BlocksCleaner{
6969
cfg: cfg,
7070
bucketClient: bucketClient,

pkg/compactor/compactor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ type Compactor struct {
296296

297297
// Functions that creates bucket client, grouper, planner and compactor using the context.
298298
// Useful for injecting mock objects from tests.
299-
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error)
299+
bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error)
300300
blocksGrouperFactory BlocksGrouperFactory
301301
blocksCompactorFactory BlocksCompactorFactory
302302

@@ -312,7 +312,7 @@ type Compactor struct {
312312
blocksPlannerFactory PlannerFactory
313313

314314
// Client used to run operations on the bucket storing blocks.
315-
bucketClient objstore.Bucket
315+
bucketClient objstore.InstrumentedBucket
316316

317317
// Ring used for sharding compactions.
318318
ringLifecycler *ring.Lifecycler
@@ -345,7 +345,7 @@ type Compactor struct {
345345

346346
// NewCompactor makes a new Compactor.
347347
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
348-
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
348+
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
349349
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
350350
}
351351

@@ -380,7 +380,7 @@ func newCompactor(
380380
storageCfg cortex_tsdb.BlocksStorageConfig,
381381
logger log.Logger,
382382
registerer prometheus.Registerer,
383-
bucketClientFactory func(ctx context.Context) (objstore.Bucket, error),
383+
bucketClientFactory func(ctx context.Context) (objstore.InstrumentedBucket, error),
384384
blocksGrouperFactory BlocksGrouperFactory,
385385
blocksCompactorFactory BlocksCompactorFactory,
386386
limits *validation.Overrides,

pkg/compactor/compactor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1647,7 +1647,7 @@ func prepareConfig() Config {
16471647
return compactorCfg
16481648
}
16491649

1650-
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
1650+
func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.InstrumentedBucket, limits *validation.Limits) (*Compactor, *tsdbCompactorMock, *tsdbPlannerMock, *concurrency.SyncBuffer, prometheus.Gatherer) {
16511651
storageCfg := cortex_tsdb.BlocksStorageConfig{}
16521652
flagext.DefaultValues(&storageCfg)
16531653

@@ -1670,7 +1670,7 @@ func prepare(t *testing.T, compactorCfg Config, bucketClient objstore.Bucket, li
16701670
overrides, err := validation.NewOverrides(*limits, nil)
16711671
require.NoError(t, err)
16721672

1673-
bucketClientFactory := func(ctx context.Context) (objstore.Bucket, error) {
1673+
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
16741674
return bucketClient, nil
16751675
}
16761676

@@ -1845,7 +1845,7 @@ func TestCompactor_DeleteLocalSyncFiles(t *testing.T) {
18451845
userIDs = append(userIDs, fmt.Sprintf("user-%d", i))
18461846
}
18471847

1848-
inmem := objstore.NewInMemBucket()
1848+
inmem := objstore.WithNoopInstr(objstore.NewInMemBucket())
18491849
for _, userID := range userIDs {
18501850
id, err := ulid.New(ulid.Now(), rand.Reader)
18511851
require.NoError(t, err)
@@ -1956,7 +1956,7 @@ func TestCompactor_ShouldFailCompactionOnTimeout(t *testing.T) {
19561956
}
19571957

19581958
func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
1959-
bucketClient := objstore.NewInMemBucket()
1959+
bucketClient := objstore.WithNoopInstr(objstore.NewInMemBucket())
19601960
id := ulid.MustNew(ulid.Now(), rand.Reader)
19611961
require.NoError(t, bucketClient.Upload(context.Background(), "user-1/"+id.String()+"/meta.json", strings.NewReader(mockBlockMetaJSON(id.String()))))
19621962

pkg/ingester/ingester_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2731,7 +2731,7 @@ func TestIngester_dontShipBlocksWhenTenantDeletionMarkerIsPresent(t *testing.T)
27312731
numObjects := len(bucket.Objects())
27322732
require.NotZero(t, numObjects)
27332733

2734-
require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, cortex_tsdb.NewTenantDeletionMark(time.Now())))
2734+
require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bucket), userID, cortex_tsdb.NewTenantDeletionMark(time.Now())))
27352735
numObjects++ // For deletion marker
27362736

27372737
db := i.getTSDB(userID)
@@ -2763,7 +2763,7 @@ func TestIngester_seriesCountIsCorrectAfterClosingTSDBForDeletedTenant(t *testin
27632763
bucket := objstore.NewInMemBucket()
27642764

27652765
// Write tenant deletion mark.
2766-
require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), bucket, userID, cortex_tsdb.NewTenantDeletionMark(time.Now())))
2766+
require.NoError(t, cortex_tsdb.WriteTenantDeletionMark(context.Background(), objstore.WithNoopInstr(bucket), userID, cortex_tsdb.NewTenantDeletionMark(time.Now())))
27672767

27682768
i.TSDBState.bucket = bucket
27692769
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))

pkg/purger/tenant_deletion_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
)
2121

2222
type TenantDeletionAPI struct {
23-
bucketClient objstore.Bucket
23+
bucketClient objstore.InstrumentedBucket
2424
logger log.Logger
2525
cfgProvider bucket.TenantConfigProvider
2626
}
@@ -34,7 +34,7 @@ func NewTenantDeletionAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvide
3434
return newTenantDeletionAPI(bucketClient, cfgProvider, logger), nil
3535
}
3636

37-
func newTenantDeletionAPI(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *TenantDeletionAPI {
37+
func newTenantDeletionAPI(bkt objstore.InstrumentedBucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *TenantDeletionAPI {
3838
return &TenantDeletionAPI{
3939
bucketClient: bkt,
4040
cfgProvider: cfgProvider,
@@ -118,7 +118,7 @@ func (api *TenantDeletionAPI) isBlocksForUserDeleted(ctx context.Context, userID
118118
return true, nil
119119
}
120120

121-
func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
121+
func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
122122
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg)
123123
if err != nil {
124124
return nil, errors.Wrap(err, "create bucket client")

pkg/purger/tenant_deletion_api_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
func TestDeleteTenant(t *testing.T) {
1919
bkt := objstore.NewInMemBucket()
20-
api := newTenantDeletionAPI(bkt, nil, log.NewNopLogger())
20+
api := newTenantDeletionAPI(objstore.WithNoopInstr(bkt), nil, log.NewNopLogger())
2121

2222
{
2323
resp := httptest.NewRecorder()
@@ -80,7 +80,7 @@ func TestDeleteTenantStatus(t *testing.T) {
8080
require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data)))
8181
}
8282

83-
api := newTenantDeletionAPI(bkt, nil, log.NewNopLogger())
83+
api := newTenantDeletionAPI(objstore.WithNoopInstr(bkt), nil, log.NewNopLogger())
8484

8585
res, err := api.isBlocksForUserDeleted(context.Background(), username)
8686
require.NoError(t, err)

pkg/storage/bucket/client.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ type Config struct {
5757

5858
// Not used internally, meant to allow callers to wrap Buckets
5959
// created using this config
60-
Middlewares []func(objstore.Bucket) (objstore.Bucket, error) `yaml:"-"`
60+
Middlewares []func(objstore.InstrumentedBucket) (objstore.InstrumentedBucket, error) `yaml:"-"`
6161

6262
// Used to inject additional backends into the config. Allows for this config to
6363
// be embedded in multiple contexts and support non-object storage based backends.
@@ -103,7 +103,8 @@ func (cfg *Config) Validate() error {
103103
}
104104

105105
// NewClient creates a new bucket client based on the configured backend
106-
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (client objstore.Bucket, err error) {
106+
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) {
107+
var client objstore.Bucket
107108
switch cfg.Backend {
108109
case S3:
109110
client, err = s3.NewBucketClient(cfg.S3, name, logger)
@@ -123,17 +124,17 @@ func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger,
123124
return nil, err
124125
}
125126

126-
client = opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))
127+
iClient := opentracing.WrapWithTraces(bucketWithMetrics(client, name, reg))
127128

128129
// Wrap the client with any provided middleware
129130
for _, wrap := range cfg.Middlewares {
130-
client, err = wrap(client)
131+
iClient, err = wrap(iClient)
131132
if err != nil {
132133
return nil, err
133134
}
134135
}
135136

136-
return client, nil
137+
return iClient, nil
137138
}
138139

139140
func bucketWithMetrics(bucketClient objstore.Bucket, name string, reg prometheus.Registerer) objstore.Bucket {

pkg/storage/bucket/client_mock.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ type ClientMock struct {
2323
uploaded sync.Map
2424
}
2525

26+
func (m *ClientMock) WithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.Bucket {
27+
return m
28+
}
29+
30+
func (m *ClientMock) ReaderWithExpectedErrs(objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
31+
return m
32+
}
33+
2634
// Upload mocks objstore.Bucket.Upload()
2735
func (m *ClientMock) Upload(ctx context.Context, name string, r io.Reader) error {
2836
if _, ok := m.uploaded.Load(name); ok {

pkg/storage/tsdb/bucketindex/markers_bucket_client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ type globalMarkersBucket struct {
1818

1919
// BucketWithGlobalMarkers wraps the input bucket into a bucket which also keeps track of markers
2020
// in the global markers location.
21-
func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {
21+
func BucketWithGlobalMarkers(b objstore.InstrumentedBucket) objstore.InstrumentedBucket {
2222
return &globalMarkersBucket{
2323
parent: b,
2424
}

pkg/storage/tsdb/bucketindex/markers_bucket_client_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,8 @@ func TestBucketWithGlobalMarkers_ShouldRetryUpload(t *testing.T) {
195195
Bucket: bkt,
196196
UploadFailures: map[string]error{p: errors.New("test")},
197197
}
198-
bkt, _ = s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger())
199-
bkt = BucketWithGlobalMarkers(bkt)
198+
s3Bkt, _ := s3.NewBucketWithRetries(mBucket, 5, 0, 0, log.NewNopLogger())
199+
bkt = BucketWithGlobalMarkers(objstore.WithNoopInstr(s3Bkt))
200200
originalPath := block1.String() + "/" + tc.mark
201201
err := bkt.Upload(ctx, originalPath, strings.NewReader("{}"))
202202
require.Equal(t, errors.New("test"), err)

0 commit comments

Comments
 (0)