Skip to content

Commit e98dfb6

Browse files
authored
Write block deletion marks in the global location too (#3561)
* Introduced bucket index writer Signed-off-by: Marco Pracucci <[email protected]> * Introduced bucket index reader Signed-off-by: Marco Pracucci <[email protected]> * Added gzip compression to bucket-index.json Signed-off-by: Marco Pracucci <[email protected]> * Introduced globalMarkersBucket Signed-off-by: Marco Pracucci <[email protected]> * Do not use InstrumentedBucket Signed-off-by: Marco Pracucci <[email protected]> * Integrated BucketWithGlobalMarkers() in the compactor Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]> * Fixed unit test Signed-off-by: Marco Pracucci <[email protected]> * Addressed review comments Signed-off-by: Marco Pracucci <[email protected]> * Removed unused function Signed-off-by: Marco Pracucci <[email protected]> * Addressed last comment Signed-off-by: Marco Pracucci <[email protected]>
1 parent 6365787 commit e98dfb6

14 files changed

+1060
-105
lines changed

pkg/compactor/blocks_cleaner_test.go

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"io/ioutil"
88
"os"
99
"path"
10-
"path/filepath"
1110
"testing"
1211
"time"
1312

@@ -20,6 +19,7 @@ import (
2019

2120
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
2221
"github.com/cortexproject/cortex/pkg/storage/tsdb"
22+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2323
"github.com/cortexproject/cortex/pkg/util/services"
2424
)
2525

@@ -49,30 +49,31 @@ func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) {
4949
// Create a bucket client on the local storage.
5050
bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
5151
require.NoError(t, err)
52+
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
5253

5354
// Create blocks.
5455
ctx := context.Background()
5556
now := time.Now()
5657
deletionDelay := 12 * time.Hour
57-
block1 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 10, 20, nil)
58-
block2 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 20, 30, nil)
59-
block3 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 30, 40, nil)
58+
block1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, nil)
59+
block2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, nil)
60+
block3 := createTSDBBlock(t, bucketClient, "user-1", 30, 40, nil)
6061
block4 := ulid.MustNew(4, rand.Reader)
6162
block5 := ulid.MustNew(5, rand.Reader)
62-
block6 := createTSDBBlock(t, filepath.Join(storageDir, "user-1"), 40, 50, nil)
63-
block7 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 10, 20, nil)
64-
block8 := createTSDBBlock(t, filepath.Join(storageDir, "user-2"), 40, 50, nil)
65-
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet.
66-
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
67-
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet.
68-
createDeletionMark(t, filepath.Join(storageDir, "user-1"), block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold.
69-
require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark.
70-
createDeletionMark(t, filepath.Join(storageDir, "user-2"), block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
63+
block6 := createTSDBBlock(t, bucketClient, "user-1", 40, 50, nil)
64+
block7 := createTSDBBlock(t, bucketClient, "user-2", 10, 20, nil)
65+
block8 := createTSDBBlock(t, bucketClient, "user-2", 40, 50, nil)
66+
createDeletionMark(t, bucketClient, "user-1", block2, now.Add(-deletionDelay).Add(time.Hour)) // Block hasn't reached the deletion threshold yet.
67+
createDeletionMark(t, bucketClient, "user-1", block3, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
68+
createDeletionMark(t, bucketClient, "user-1", block4, now.Add(-deletionDelay).Add(time.Hour)) // Partial block hasn't reached the deletion threshold yet.
69+
createDeletionMark(t, bucketClient, "user-1", block5, now.Add(-deletionDelay).Add(-time.Hour)) // Partial block reached the deletion threshold.
70+
require.NoError(t, bucketClient.Delete(ctx, path.Join("user-1", block6.String(), metadata.MetaFilename))) // Partial block without deletion mark.
71+
createDeletionMark(t, bucketClient, "user-2", block7, now.Add(-deletionDelay).Add(-time.Hour)) // Block reached the deletion threshold.
7172

7273
// Blocks for user-3, marked for deletion.
7374
require.NoError(t, tsdb.WriteTenantDeletionMark(context.Background(), bucketClient, "user-3"))
74-
block9 := createTSDBBlock(t, filepath.Join(storageDir, "user-3"), 10, 30, nil)
75-
block10 := createTSDBBlock(t, filepath.Join(storageDir, "user-3"), 30, 50, nil)
75+
block9 := createTSDBBlock(t, bucketClient, "user-3", 10, 30, nil)
76+
block10 := createTSDBBlock(t, bucketClient, "user-3", 30, 50, nil)
7677

7778
cfg := BlocksCleanerConfig{
7879
DataDir: dataDir,
@@ -96,14 +97,18 @@ func testBlocksCleanerWithConcurrency(t *testing.T, concurrency int) {
9697
// Check the storage to ensure only the block which has reached the deletion threshold
9798
// has been effectively deleted.
9899
{path: path.Join("user-1", block1.String(), metadata.MetaFilename), expectedExists: true},
99-
{path: path.Join("user-1", block2.String(), metadata.MetaFilename), expectedExists: true},
100100
{path: path.Join("user-1", block3.String(), metadata.MetaFilename), expectedExists: false},
101101
{path: path.Join("user-2", block7.String(), metadata.MetaFilename), expectedExists: false},
102102
{path: path.Join("user-2", block8.String(), metadata.MetaFilename), expectedExists: true},
103+
// Should not delete a block with deletion mark who hasn't reached the deletion threshold yet.
104+
{path: path.Join("user-1", block2.String(), metadata.MetaFilename), expectedExists: true},
105+
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block2)), expectedExists: true},
103106
// Should delete a partial block with deletion mark who hasn't reached the deletion threshold yet.
104107
{path: path.Join("user-1", block4.String(), metadata.DeletionMarkFilename), expectedExists: false},
108+
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block4)), expectedExists: false},
105109
// Should delete a partial block with deletion mark who has reached the deletion threshold.
106110
{path: path.Join("user-1", block5.String(), metadata.DeletionMarkFilename), expectedExists: false},
111+
{path: path.Join("user-1", bucketindex.BlockDeletionMarkFilepath(block5)), expectedExists: false},
107112
// Should not delete a partial block without deletion mark.
108113
{path: path.Join("user-1", block6.String(), "index"), expectedExists: true},
109114
// Should completely delete blocks for user-3, marked for deletion

pkg/compactor/compactor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cortexproject/cortex/pkg/ring"
2626
"github.com/cortexproject/cortex/pkg/storage/bucket"
2727
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
28+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2829
"github.com/cortexproject/cortex/pkg/util"
2930
"github.com/cortexproject/cortex/pkg/util/flagext"
3031
"github.com/cortexproject/cortex/pkg/util/services"
@@ -270,6 +271,9 @@ func (c *Compactor) starting(ctx context.Context) error {
270271
return errors.Wrap(err, "failed to initialize compactor objects")
271272
}
272273

274+
// Wrap the bucket client to write block deletion marks in the global location too.
275+
c.bucketClient = bucketindex.BucketWithGlobalMarkers(c.bucketClient)
276+
273277
// Create the users scanner.
274278
c.usersScanner = cortex_tsdb.NewUsersScanner(c.bucketClient, c.ownUser, c.parentLogger)
275279

pkg/compactor/compactor_test.go

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package compactor
22

33
import (
4+
"bytes"
45
"context"
56
"encoding/json"
67
"flag"
78
"fmt"
89
"io/ioutil"
910
"os"
10-
"os/exec"
1111
"path"
1212
"path/filepath"
1313
"strconv"
@@ -538,6 +538,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
538538
bucketClient.MockIter("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", []string{"user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json"}, nil)
539539
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", nil)
540540
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", nil)
541+
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
541542
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
542543

543544
c, _, tsdbPlanner, logs, registry, cleanup := prepare(t, cfg, bucketClient)
@@ -863,7 +864,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
863864
}
864865
}
865866

866-
func createTSDBBlock(t *testing.T, dir string, minT, maxT int64, externalLabels map[string]string) ulid.ULID {
867+
func createTSDBBlock(t *testing.T, bkt objstore.Bucket, userID string, minT, maxT int64, externalLabels map[string]string) ulid.ULID {
867868
// Create a temporary dir for TSDB.
868869
tempDir, err := ioutil.TempDir(os.TempDir(), "tsdb")
869870
require.NoError(t, err)
@@ -916,24 +917,40 @@ func createTSDBBlock(t *testing.T, dir string, minT, maxT int64, externalLabels
916917
_, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(snapshotDir, blockID.String()), meta, nil)
917918
require.NoError(t, err)
918919

919-
// Ensure the output directory exists.
920-
require.NoError(t, os.MkdirAll(dir, os.ModePerm))
920+
// Copy the block files to the bucket.
921+
srcRoot := filepath.Join(snapshotDir, blockID.String())
922+
require.NoError(t, filepath.Walk(srcRoot, func(file string, info os.FileInfo, err error) error {
923+
if err != nil {
924+
return err
925+
}
926+
if info.IsDir() {
927+
return nil
928+
}
921929

922-
// Copy the block files to the storage dir.
923-
require.NoError(t, exec.Command("cp", "-r", filepath.Join(snapshotDir, blockID.String()), dir).Run())
930+
// Read the file content in memory.
931+
content, err := ioutil.ReadFile(file)
932+
if err != nil {
933+
return err
934+
}
935+
936+
// Upload it to the bucket.
937+
relPath, err := filepath.Rel(srcRoot, file)
938+
if err != nil {
939+
return err
940+
}
941+
942+
return bkt.Upload(context.Background(), path.Join(userID, blockID.String(), relPath), bytes.NewReader(content))
943+
}))
924944

925945
return blockID
926946
}
927947

928-
func createDeletionMark(t *testing.T, dir string, blockID ulid.ULID, deletionTime time.Time) {
948+
func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID, deletionTime time.Time) {
929949
content := mockDeletionMarkJSON(blockID.String(), deletionTime)
930-
blockPath := filepath.Join(dir, blockID.String())
931-
markPath := filepath.Join(blockPath, metadata.DeletionMarkFilename)
932-
933-
// Ensure the block directory exists.
934-
require.NoError(t, os.MkdirAll(blockPath, os.ModePerm))
950+
blockPath := path.Join(userID, blockID.String())
951+
markPath := path.Join(blockPath, metadata.DeletionMarkFilename)
935952

936-
require.NoError(t, ioutil.WriteFile(markPath, []byte(content), os.ModePerm))
953+
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
937954
}
938955

939956
func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {

pkg/querier/blocks_scanner_test.go

Lines changed: 25 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@ package querier
22

33
import (
44
"context"
5-
"crypto/rand"
6-
"encoding/json"
75
"fmt"
86
"io/ioutil"
97
"os"
@@ -21,13 +19,13 @@ import (
2119
"github.com/stretchr/testify/assert"
2220
"github.com/stretchr/testify/mock"
2321
"github.com/stretchr/testify/require"
24-
"github.com/thanos-io/thanos/pkg/block/metadata"
2522
"github.com/thanos-io/thanos/pkg/objstore"
2623

2724
"github.com/cortexproject/cortex/pkg/storage/bucket"
2825
"github.com/cortexproject/cortex/pkg/storage/bucket/filesystem"
2926
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
3027
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
28+
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
3129
"github.com/cortexproject/cortex/pkg/util/services"
3230
)
3331

@@ -36,10 +34,10 @@ func TestBlocksScanner_InitialScan(t *testing.T) {
3634
s, bucket, _, reg, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
3735
defer cleanup()
3836

39-
user1Block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
40-
user1Block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
41-
user2Block1 := mockStorageBlock(t, bucket, "user-2", 10, 20)
42-
user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-2", user2Block1))
37+
user1Block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
38+
user1Block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
39+
user2Block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-2", 10, 20)
40+
user2Mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-2", user2Block1))
4341

4442
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
4543

@@ -224,9 +222,9 @@ func TestBlocksScanner_PeriodicScanFindsNewUser(t *testing.T) {
224222
require.Equal(t, 0, len(blocks))
225223
assert.Empty(t, deletionMarks)
226224

227-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
228-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
229-
mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block2))
225+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
226+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
227+
mark2 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block2))
230228

231229
// Trigger a periodic sync
232230
require.NoError(t, s.scan(ctx))
@@ -248,7 +246,7 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
248246
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
249247
defer cleanup()
250248

251-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
249+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
252250

253251
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
254252

@@ -259,7 +257,7 @@ func TestBlocksScanner_PeriodicScanFindsNewBlock(t *testing.T) {
259257
assert.WithinDuration(t, time.Now(), blocks[0].GetUploadedAt(), 5*time.Second)
260258
assert.Empty(t, deletionMarks)
261259

262-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
260+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
263261

264262
// Trigger a periodic sync
265263
require.NoError(t, s.scan(ctx))
@@ -279,8 +277,8 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
279277
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
280278
defer cleanup()
281279

282-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
283-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
280+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
281+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
284282

285283
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
286284

@@ -291,7 +289,7 @@ func TestBlocksScanner_PeriodicScanFindsBlockMarkedForDeletion(t *testing.T) {
291289
assert.Equal(t, block1.ULID, blocks[1].ID)
292290
assert.Empty(t, deletionMarks)
293291

294-
mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block1))
292+
mark1 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block1))
295293

296294
// Trigger a periodic sync
297295
require.NoError(t, s.scan(ctx))
@@ -311,8 +309,8 @@ func TestBlocksScanner_PeriodicScanFindsDeletedBlock(t *testing.T) {
311309
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
312310
defer cleanup()
313311

314-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
315-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
312+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
313+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
316314

317315
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
318316

@@ -340,8 +338,8 @@ func TestBlocksScanner_PeriodicScanFindsDeletedUser(t *testing.T) {
340338
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
341339
defer cleanup()
342340

343-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
344-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
341+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
342+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
345343

346344
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
347345

@@ -368,8 +366,8 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing
368366
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
369367
defer cleanup()
370368

371-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 20)
372-
block2 := mockStorageBlock(t, bucket, "user-1", 20, 30)
369+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 20)
370+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
373371

374372
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
375373

@@ -390,7 +388,7 @@ func TestBlocksScanner_PeriodicScanFindsUserWhichWasPreviouslyDeleted(t *testing
390388
require.Equal(t, 0, len(blocks))
391389
assert.Empty(t, deletionMarks)
392390

393-
block3 := mockStorageBlock(t, bucket, "user-1", 30, 40)
391+
block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40)
394392

395393
// Trigger a periodic sync
396394
require.NoError(t, s.scan(ctx))
@@ -407,11 +405,11 @@ func TestBlocksScanner_GetBlocks(t *testing.T) {
407405
s, bucket, _, _, cleanup := prepareBlocksScanner(t, prepareBlocksScannerConfig())
408406
defer cleanup()
409407

410-
block1 := mockStorageBlock(t, bucket, "user-1", 10, 15)
411-
block2 := mockStorageBlock(t, bucket, "user-1", 12, 20)
412-
block3 := mockStorageBlock(t, bucket, "user-1", 20, 30)
413-
block4 := mockStorageBlock(t, bucket, "user-1", 30, 40)
414-
mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(mockStorageDeletionMark(t, bucket, "user-1", block3))
408+
block1 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 10, 15)
409+
block2 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 12, 20)
410+
block3 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 20, 30)
411+
block4 := cortex_testutil.MockStorageBlock(t, bucket, "user-1", 30, 40)
412+
mark3 := bucketindex.BlockDeletionMarkFromThanosMarker(cortex_testutil.MockStorageDeletionMark(t, bucket, "user-1", block3))
415413

416414
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
417415

@@ -523,50 +521,3 @@ func prepareBlocksScannerConfig() BlocksScannerConfig {
523521
IgnoreDeletionMarksDelay: time.Hour,
524522
}
525523
}
526-
527-
func mockStorageBlock(t *testing.T, bucket objstore.Bucket, userID string, minT, maxT int64) tsdb.BlockMeta {
528-
// Generate a block ID whose timestamp matches the maxT (for simplicity we assume it
529-
// has been compacted and shipped in zero time, even if not realistic).
530-
id := ulid.MustNew(uint64(maxT), rand.Reader)
531-
532-
meta := tsdb.BlockMeta{
533-
Version: 1,
534-
ULID: id,
535-
MinTime: minT,
536-
MaxTime: maxT,
537-
Compaction: tsdb.BlockMetaCompaction{
538-
Level: 1,
539-
Sources: []ulid.ULID{id},
540-
},
541-
}
542-
543-
metaContent, err := json.Marshal(meta)
544-
if err != nil {
545-
panic("failed to marshal mocked block meta")
546-
}
547-
548-
metaContentReader := strings.NewReader(string(metaContent))
549-
metaPath := fmt.Sprintf("%s/%s/meta.json", userID, id.String())
550-
require.NoError(t, bucket.Upload(context.Background(), metaPath, metaContentReader))
551-
552-
return meta
553-
}
554-
555-
func mockStorageDeletionMark(t *testing.T, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *metadata.DeletionMark {
556-
mark := metadata.DeletionMark{
557-
ID: meta.ULID,
558-
DeletionTime: time.Now().Add(-time.Minute).Unix(),
559-
Version: metadata.DeletionMarkVersion1,
560-
}
561-
562-
markContent, err := json.Marshal(mark)
563-
if err != nil {
564-
panic("failed to marshal mocked block meta")
565-
}
566-
567-
markContentReader := strings.NewReader(string(markContent))
568-
markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), metadata.DeletionMarkFilename)
569-
require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader))
570-
571-
return &mark
572-
}

pkg/storage/tsdb/bucketindex/index.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ import (
1616
)
1717

1818
const (
19-
IndexFilename = "bucket-index.json"
20-
IndexVersion1 = 1
19+
IndexFilename = "bucket-index.json"
20+
IndexCompressedFilename = IndexFilename + ".gz"
21+
IndexVersion1 = 1
2122

2223
SegmentsFormatUnknown = ""
2324

0 commit comments

Comments
 (0)