diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e773d06977..1fd58b18b15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Changelog ## master / unreleased - +* [FEATURE] Block Storage: Added Prometheus style API endpoints for series deletion. Needs to be enabled first by setting `--purger.enable` to `true`. This only handles the creating, getting and cancelling requests. Actual deletion and query time filtering will be part of future PRs. #4370 ## 1.13.0 in progress * [CHANGE] Changed default for `-ingester.min-ready-duration` from 1 minute to 15 seconds. #4539 diff --git a/pkg/api/api.go b/pkg/api/api.go index ced7685448f..4c26fafd739 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -284,6 +284,19 @@ func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { a.RegisterRoute("/purger/delete_tenant_status", http.HandlerFunc(api.DeleteTenantStatus), true, "GET") } +func (a *API) RegisterBlocksPurger(blocksPurger *purger.BlocksPurgerAPI) { + + a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST") + a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.GetAllDeleteRequestsHandler), true, "GET") + a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST") + + // Legacy Routes + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "GET") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST") + +} + // RegisterRuler registers routes associated with the Ruler service. func (a *API) RegisterRuler(r *ruler.Ruler) { a.indexPage.AddLink(SectionAdminEndpoints, "/ruler/ring", "Ruler Ring Status") diff --git a/pkg/chunk/purger/blocks_purger.go b/pkg/chunk/purger/blocks_purger.go new file mode 100644 index 00000000000..802f5b6b1f8 --- /dev/null +++ b/pkg/chunk/purger/blocks_purger.go @@ -0,0 +1,244 @@ +package purger + +import ( + "crypto/md5" + "encoding/binary" + "encoding/hex" + "encoding/json" + fmt "fmt" + "net/http" + "sort" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +type BlocksPurgerAPI struct { + bucketClient objstore.Bucket + logger log.Logger + cfgProvider bucket.TenantConfigProvider + deleteRequestCancelPeriod time.Duration +} + +func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, cancellationPeriod time.Duration) (*BlocksPurgerAPI, error) { + bucketClient, err := createBucketClient(storageCfg, logger, "blocks-purger", reg) + if err != nil { + return nil, err + } + + return newBlocksPurgerAPI(bucketClient, cfgProvider, logger, cancellationPeriod), nil +} + +func newBlocksPurgerAPI(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, cancellationPeriod time.Duration) *BlocksPurgerAPI { + return &BlocksPurgerAPI{ + bucketClient: bkt, + cfgProvider: cfgProvider, + logger: logger, + deleteRequestCancelPeriod: cancellationPeriod, + } +} + +func (api *BlocksPurgerAPI) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + params := r.URL.Query() + match := params["match[]"] + if len(match) == 0 { + http.Error(w, "selectors not set", http.StatusBadRequest) + return + } + + matchers, err := cortex_tsdb.ParseMatchers(match) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + startParam := params.Get("start") + startTime := int64(0) + if startParam != "" { + startTime, err = util.ParseTime(startParam) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } + + endParam := params.Get("end") + endTime := int64(model.Now()) + + if endParam != "" { + endTime, err = util.ParseTime(endParam) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + if endTime > int64(model.Now()) { + http.Error(w, "deletes in future not allowed", http.StatusBadRequest) + return + } + } + + if startTime > endTime { + http.Error(w, "start time can't be greater than end time", http.StatusBadRequest) + return + } + + tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, api.logger) + + requestID := getTombstoneHash(startTime, endTime, matchers) + // Since the request id is based on a hash of the parameters, there is a possibility that a tombstone could already exist for it + // if the request was previously cancelled, we need to remove the cancelled tombstone before adding the pending one + if err := tManager.RemoveCancelledStateIfExists(ctx, requestID); err != nil { + level.Error(util_log.Logger).Log("msg", "removing cancelled tombstone state if it exists", "err", err) + http.Error(w, "Error checking previous delete requests and removing the past cancelled version of this request if it exists ", http.StatusInternalServerError) + return + } + + prevT, err := tManager.GetTombstoneByIDForUser(ctx, requestID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete request by id", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if prevT != nil { + http.Error(w, "delete request tombstone with same information already exists", http.StatusBadRequest) + return + } + + curTime := time.Now().Unix() * 1000 + t := cortex_tsdb.NewTombstone(userID, curTime, curTime, startTime, endTime, match, requestID, cortex_tsdb.StatePending) + + if err = tManager.WriteTombstone(ctx, t); err != nil { + level.Error(util_log.Logger).Log("msg", "error adding delete request to the object store", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (api *BlocksPurgerAPI) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, api.logger) + deleteRequests, err := tManager.GetAllTombstonesForUser(ctx) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete requests from the block store", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(deleteRequests); err != nil { + level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err) + http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError) + return + } +} + +func (api *BlocksPurgerAPI) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userID, err := tenant.TenantID(ctx) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + params := r.URL.Query() + requestID := params.Get("request_id") + if len(requestID) == 0 { + http.Error(w, "request_id not set", http.StatusBadRequest) + return + } + + tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, api.logger) + deleteRequest, err := tManager.GetTombstoneByIDForUser(ctx, requestID) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error getting delete request from the object store", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + if deleteRequest == nil { + http.Error(w, "could not find delete request with given id", http.StatusBadRequest) + return + } + + if deleteRequest.State == cortex_tsdb.StateCancelled { + http.Error(w, "the series deletion request was cancelled previously", http.StatusAccepted) + return + } + + if deleteRequest.State == cortex_tsdb.StateProcessed { + http.Error(w, "deletion of request which is already processed is not allowed", http.StatusBadRequest) + return + } + + if time.Since(deleteRequest.GetCreateTime()) > api.deleteRequestCancelPeriod { + http.Error(w, fmt.Sprintf("Cancellation of request past the deadline of %s since its creation is not allowed", api.deleteRequestCancelPeriod.String()), http.StatusBadRequest) + return + } + + // create file with the cancelled state + _, err = tManager.UpdateTombstoneState(ctx, deleteRequest, cortex_tsdb.StateCancelled) + if err != nil { + level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func getTombstoneHash(startTime int64, endTime int64, selectors []*labels.Matcher) string { + // Any delete request with the same start, end time and same selectors should result in the same hash + + hash := md5.New() + + bufStart := make([]byte, 8) + binary.LittleEndian.PutUint64(bufStart, uint64(startTime)) + bufEnd := make([]byte, 8) + binary.LittleEndian.PutUint64(bufEnd, uint64(startTime)) + + hash.Write(bufStart) + hash.Write(bufEnd) + + // First we get the strings of the parsed matchers which + // then are sorted and hashed after. This is done so that logically + // equivalent deletion requests result in the same hash + selectorStrings := make([]string, len(selectors)) + for i, s := range selectors { + selectorStrings[i] = s.String() + } + + sort.Strings(selectorStrings) + for _, s := range selectorStrings { + hash.Write([]byte(s)) + } + + md5Bytes := hash.Sum(nil) + return hex.EncodeToString(md5Bytes[:]) +} diff --git a/pkg/chunk/purger/blocks_purger_test.go b/pkg/chunk/purger/blocks_purger_test.go new file mode 100644 index 00000000000..a612814b9ec --- /dev/null +++ b/pkg/chunk/purger/blocks_purger_test.go @@ -0,0 +1,328 @@ +package purger + +import ( + "context" + math "math" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/weaveworks/common/user" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func TestBlocksDeleteSeries_AddingDeletionRequests(t *testing.T) { + for name, tc := range map[string]struct { + parameters url.Values + expectedHTTPStatus int + }{ + "empty": { + parameters: nil, + expectedHTTPStatus: http.StatusBadRequest, + }, + + "valid request": { + parameters: url.Values{ + "start": []string{"1"}, + "end": []string{"2"}, + "match[]": []string{"selector"}, + }, + expectedHTTPStatus: http.StatusNoContent, + }, + + "end time in the future": { + parameters: url.Values{ + "start": []string{"1"}, + "end": []string{strconv.Itoa(math.MaxInt64)}, + "match[]": []string{"selector"}, + }, + expectedHTTPStatus: http.StatusBadRequest, + }, + "the start time is after the end time": { + parameters: url.Values{ + "start": []string{"2"}, + "end": []string{"1"}, + "match[]": []string{"selector"}, + }, + expectedHTTPStatus: http.StatusBadRequest, + }, + } { + t.Run(name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger(), 0) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, userID) + + u := &url.URL{ + RawQuery: tc.parameters.Encode(), + } + + req := &http.Request{ + Method: "POST", + RequestURI: u.String(), + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + resp := httptest.NewRecorder() + api.AddDeleteRequestHandler(resp, req.WithContext(ctx)) + require.Equal(t, tc.expectedHTTPStatus, resp.Code) + + }) + } +} + +func TestBlocksDeleteSeries_AddingSameRequestTwiceShouldFail(t *testing.T) { + + for name, tc := range map[string]struct { + selectorsFirst []string + selectorsSecond []string + }{ + "exact same request twice should fail": { + selectorsFirst: []string{"process_start_time_seconds{job=\"prometheus\"}"}, + selectorsSecond: []string{"process_start_time_seconds{job=\"prometheus\"}"}, + }, + "same request but with extra whitespace should fail": { + selectorsFirst: []string{"process_start_time_seconds{job=\"prometheus\"}"}, + selectorsSecond: []string{"process_start_time_seconds{job= \"prometheus\"}"}, + }, + // Since a deletion request is performed using the AND of all provided selectors + // it doesn't matter if the selectors are provided together as one string or + // in separate strings. + "same request but split in multiple matchers should fail": { + selectorsFirst: []string{"process_start_time_seconds{job=\"prometheus\"}"}, + selectorsSecond: []string{"process_start_time_seconds", "{job= \"prometheus\"}"}, + }, + } { + t.Run(name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger(), 0) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, userID) + + params := url.Values{ + "start": []string{"1"}, + "end": []string{"2"}, + "match[]": tc.selectorsFirst, + } + + u := &url.URL{ + RawQuery: params.Encode(), + } + + req := &http.Request{ + Method: "POST", + RequestURI: u.String(), + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + resp := httptest.NewRecorder() + api.AddDeleteRequestHandler(resp, req.WithContext(ctx)) + + // First request made should be okay + require.Equal(t, http.StatusNoContent, resp.Code) + + params = url.Values{ + "start": []string{"1"}, + "end": []string{"2"}, + "match[]": tc.selectorsSecond, + } + + u = &url.URL{ + RawQuery: params.Encode(), + } + + req = &http.Request{ + Method: "POST", + RequestURI: u.String(), + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + //second should not be accepted because the same exact request already exists + resp = httptest.NewRecorder() + api.AddDeleteRequestHandler(resp, req.WithContext(ctx)) + require.Equal(t, http.StatusBadRequest, resp.Code) + + }) + } +} + +func TestBlocksDeleteSeries_AddingNewRequestShouldDeleteCancelledState(t *testing.T) { + + // If a tombstone has previously been cancelled, and a new request + // being made results in the same request id, the cancelled tombstone + // should be deleted from the bucket + + bkt := objstore.NewInMemBucket() + api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger(), time.Minute*5) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, userID) + + //first create a new tombstone + paramsCreate := url.Values{ + "start": []string{"1"}, + "end": []string{"2"}, + "match[]": []string{"node_exporter"}, + } + + uCreate := &url.URL{ + RawQuery: paramsCreate.Encode(), + } + + reqCreate := &http.Request{ + Method: "POST", + RequestURI: uCreate.String(), + URL: uCreate, + Body: http.NoBody, + Header: http.Header{}, + } + + resp := httptest.NewRecorder() + api.AddDeleteRequestHandler(resp, reqCreate.WithContext(ctx)) + require.Equal(t, http.StatusNoContent, resp.Code) + + //cancel the previous request + selector, _ := cortex_tsdb.ParseMatchers([]string{"node_exporter"}) + requestID := getTombstoneHash(1000, 2000, selector) + paramsDelete := url.Values{ + "request_id": []string{requestID}, + } + uCancel := &url.URL{ + RawQuery: paramsDelete.Encode(), + } + + reqCancel := &http.Request{ + Method: "POST", + RequestURI: uCancel.String(), + URL: uCancel, + Body: http.NoBody, + Header: http.Header{}, + } + + resp = httptest.NewRecorder() + api.CancelDeleteRequestHandler(resp, reqCancel.WithContext(ctx)) + require.Equal(t, http.StatusNoContent, resp.Code) + + // check that the cancelled file exists + tCancelledPath := userID + "/tombstones/" + requestID + "." + string(cortex_tsdb.StateCancelled) + ".json" + exists, _ := bkt.Exists(ctx, tCancelledPath) + require.True(t, exists) + + // create a new request and make sure the cancelled file no longer exists + resp = httptest.NewRecorder() + api.AddDeleteRequestHandler(resp, reqCreate.WithContext(ctx)) + require.Equal(t, http.StatusNoContent, resp.Code) + + exists, _ = bkt.Exists(ctx, tCancelledPath) + require.False(t, exists) + +} + +func TestBlocksDeleteSeries_CancellingRequest(t *testing.T) { + + for name, tc := range map[string]struct { + requestID string + createdAt int64 + requestState cortex_tsdb.BlockDeleteRequestState + cancellationPeriod time.Duration + cancelledFileExists bool + expectedHTTPStatus int + }{ + "not allowed, grace period has passed": { + requestID: "requestID", + createdAt: 0, + requestState: cortex_tsdb.StatePending, + cancellationPeriod: time.Second, + cancelledFileExists: false, + expectedHTTPStatus: http.StatusBadRequest, + }, + + "allowed, grace period not over yet": { + requestID: "requestID", + createdAt: time.Now().Unix() * 1000, + requestState: cortex_tsdb.StatePending, + cancellationPeriod: time.Hour, + cancelledFileExists: true, + expectedHTTPStatus: http.StatusNoContent, + }, + "not allowed, deletion already occurred": { + requestID: "requestID", + createdAt: 0, + requestState: cortex_tsdb.StateProcessed, + cancellationPeriod: time.Second, + cancelledFileExists: false, + expectedHTTPStatus: http.StatusBadRequest, + }, + "not allowed,request already cancelled": { + requestID: "requestID", + createdAt: 0, + requestState: cortex_tsdb.StateCancelled, + cancellationPeriod: time.Second, + cancelledFileExists: true, + expectedHTTPStatus: http.StatusAccepted, + }, + "not allowed, request id missing": { + requestID: "", + createdAt: 0, + requestState: cortex_tsdb.StatePending, + cancellationPeriod: time.Second, + cancelledFileExists: false, + expectedHTTPStatus: http.StatusBadRequest, + }, + } { + t.Run(name, func(t *testing.T) { + bkt := objstore.NewInMemBucket() + api := newBlocksPurgerAPI(bkt, nil, log.NewNopLogger(), tc.cancellationPeriod) + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, userID) + + tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, log.NewNopLogger()) + + //create the tombstone + tombstone := cortex_tsdb.NewTombstone(userID, tc.createdAt, tc.createdAt, 0, 1, []string{"match"}, tc.requestID, tc.requestState) + err := tManager.WriteTombstone(ctx, tombstone) + require.NoError(t, err) + + params := url.Values{ + "request_id": []string{tc.requestID}, + } + + u := &url.URL{ + RawQuery: params.Encode(), + } + + req := &http.Request{ + Method: "POST", + RequestURI: u.String(), + URL: u, + Body: http.NoBody, + Header: http.Header{}, + } + + resp := httptest.NewRecorder() + api.CancelDeleteRequestHandler(resp, req.WithContext(ctx)) + require.Equal(t, tc.expectedHTTPStatus, resp.Code) + + // check if the cancelled tombstone file exists + exists, _ := tManager.TombstoneExists(ctx, tc.requestID, cortex_tsdb.StateCancelled) + require.Equal(t, tc.cancelledFileExists, exists) + + }) + } +} diff --git a/pkg/chunk/purger/purger.go b/pkg/chunk/purger/purger.go index 7c37c29300b..d02baa008ca 100644 --- a/pkg/chunk/purger/purger.go +++ b/pkg/chunk/purger/purger.go @@ -87,7 +87,7 @@ type deleteRequestWithLogger struct { // Config holds config for chunks Purger type Config struct { - Enable bool `yaml:"enable"` + EnableSeriesDeletion bool `yaml:"enable"` NumWorkers int `yaml:"num_workers"` ObjectStoreType string `yaml:"object_store_type"` DeleteRequestCancelPeriod time.Duration `yaml:"delete_request_cancel_period"` @@ -95,7 +95,7 @@ type Config struct { // RegisterFlags registers CLI flags for Config func (cfg *Config) RegisterFlags(f *flag.FlagSet) { - f.BoolVar(&cfg.Enable, "purger.enable", false, "Enable purger to allow deletion of series. Be aware that Delete series feature is still experimental") + f.BoolVar(&cfg.EnableSeriesDeletion, "purger.enable", false, "Enable purger to allow deletion of series. Be aware that Delete series feature is still experimental") f.IntVar(&cfg.NumWorkers, "purger.num-workers", 2, "Number of workers executing delete plans in parallel") f.StringVar(&cfg.ObjectStoreType, "purger.object-store-type", "", "Name of the object store to use for storing delete plans") f.DurationVar(&cfg.DeleteRequestCancelPeriod, "purger.delete-request-cancel-period", 24*time.Hour, "Allow cancellation of delete request until duration after they are created. Data would be deleted only after delete requests have been older than this duration. Ideally this should be set to at least 24h.") diff --git a/pkg/chunk/purger/tenant_deletion_api.go b/pkg/chunk/purger/tenant_deletion_api.go index a8c6b8ff671..ea67bd2e719 100644 --- a/pkg/chunk/purger/tenant_deletion_api.go +++ b/pkg/chunk/purger/tenant_deletion_api.go @@ -26,7 +26,7 @@ type TenantDeletionAPI struct { } func NewTenantDeletionAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer) (*TenantDeletionAPI, error) { - bucketClient, err := createBucketClient(storageCfg, logger, reg) + bucketClient, err := createBucketClient(storageCfg, logger, "tenant-deletion-purger", reg) if err != nil { return nil, err } @@ -118,8 +118,8 @@ func (api *TenantDeletionAPI) isBlocksForUserDeleted(ctx context.Context, userID return true, nil } -func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { - bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg) +func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, name string, reg prometheus.Registerer) (objstore.Bucket, error) { + bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, name, logger, reg) if err != nil { return nil, errors.Wrap(err, "create bucket client") } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 20e2ed62508..9a9316e7956 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -80,6 +80,7 @@ const ( StoreGateway string = "store-gateway" MemberlistKV string = "memberlist-kv" ChunksPurger string = "chunks-purger" + BlocksPurger string = "blocks-purger" TenantDeletion string = "tenant-deletion" Purger string = "purger" QueryScheduler string = "query-scheduler" @@ -479,7 +480,7 @@ func (t *Cortex) initChunkStore() (serv services.Service, err error) { } func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) { - if t.Cfg.Storage.Engine != storage.StorageEngineChunks || !t.Cfg.PurgerConfig.Enable { + if t.Cfg.Storage.Engine != storage.StorageEngineChunks || !t.Cfg.PurgerConfig.EnableSeriesDeletion { // until we need to explicitly enable delete series support we need to do create TombstonesLoader without DeleteStore which acts as noop t.TombstonesLoader = purger.NewTombstonesLoader(nil, nil) @@ -617,7 +618,7 @@ func (t *Cortex) initTableManager() (services.Service, error) { util_log.CheckFatal("initializing bucket client", err) var extraTables []chunk.ExtraTables - if t.Cfg.PurgerConfig.Enable { + if t.Cfg.PurgerConfig.EnableSeriesDeletion { reg := prometheus.WrapRegistererWith( prometheus.Labels{"component": "table-manager-" + DeleteRequestsStore}, prometheus.DefaultRegisterer) @@ -792,7 +793,7 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) { } func (t *Cortex) initChunksPurger() (services.Service, error) { - if t.Cfg.Storage.Engine != storage.StorageEngineChunks || !t.Cfg.PurgerConfig.Enable { + if t.Cfg.Storage.Engine != storage.StorageEngineChunks || !t.Cfg.PurgerConfig.EnableSeriesDeletion { return nil, nil } @@ -826,6 +827,21 @@ func (t *Cortex) initTenantDeletionAPI() (services.Service, error) { return nil, nil } +func (t *Cortex) initBlocksPurger() (services.Service, error) { + if t.Cfg.Storage.Engine != storage.StorageEngineBlocks || !t.Cfg.PurgerConfig.EnableSeriesDeletion { + return nil, nil + } + + // t.RulerStorage can be nil when running in single-binary mode, and rule storage is not configured. + blockPurger, err := purger.NewBlocksPurgerAPI(t.Cfg.BlocksStorage, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer, t.Cfg.PurgerConfig.DeleteRequestCancelPeriod) + if err != nil { + return nil, err + } + + t.API.RegisterBlocksPurger(blockPurger) + return nil, nil +} + func (t *Cortex) initQueryScheduler() (services.Service, error) { s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { @@ -869,6 +885,7 @@ func (t *Cortex) setupModuleManager() error { mm.RegisterModule(StoreGateway, t.initStoreGateway) mm.RegisterModule(ChunksPurger, t.initChunksPurger, modules.UserInvisibleModule) mm.RegisterModule(TenantDeletion, t.initTenantDeletionAPI, modules.UserInvisibleModule) + mm.RegisterModule(BlocksPurger, t.initBlocksPurger, modules.UserInvisibleModule) mm.RegisterModule(Purger, nil) mm.RegisterModule(QueryScheduler, t.initQueryScheduler) mm.RegisterModule(TenantFederation, t.initTenantFederation, modules.UserInvisibleModule) @@ -903,7 +920,8 @@ func (t *Cortex) setupModuleManager() error { StoreGateway: {API, Overrides, MemberlistKV}, ChunksPurger: {Store, DeleteRequestsStore, API}, TenantDeletion: {Store, API, Overrides}, - Purger: {ChunksPurger, TenantDeletion}, + BlocksPurger: {Store, API, Overrides}, + Purger: {ChunksPurger, TenantDeletion, BlocksPurger}, TenantFederation: {Queryable}, All: {QueryFrontend, Querier, Ingester, Distributor, TableManager, Purger, StoreGateway, Ruler}, } diff --git a/pkg/storage/tsdb/tombstones.go b/pkg/storage/tsdb/tombstones.go new file mode 100644 index 00000000000..a55bf071075 --- /dev/null +++ b/pkg/storage/tsdb/tombstones.go @@ -0,0 +1,356 @@ +package tsdb + +import ( + "bytes" + "context" + "encoding/json" + "path" + "path/filepath" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + util_log "github.com/cortexproject/cortex/pkg/util/log" +) + +type BlockDeleteRequestState string + +const ( + StatePending BlockDeleteRequestState = "pending" + StateProcessed BlockDeleteRequestState = "processed" + StateCancelled BlockDeleteRequestState = "deleted" +) + +// Relative to user-specific prefix. +const TombstonePath = "tombstones/" + +var ( + ErrTombstoneAlreadyExists = errors.New("The deletion tombstone with the same request information already exists") + ErrInvalidDeletionRequestState = errors.New("Tombstone filename extension indicating the deletion request state is invalid") + ErrTombstoneNotFound = errors.New("Tombstone not found in the object store") + ErrTombstoneDecode = errors.New("Unable to read tombstone contents from file") + AllDeletionStates = []BlockDeleteRequestState{StatePending, StateProcessed, StateCancelled} +) + +type Tombstone struct { + RequestCreatedAt int64 `json:"request_created_at"` + StateCreatedAt int64 `json:"state_created_at"` + RequestID string `json:"request_id"` + StartTime int64 `json:"start_time"` + EndTime int64 `json:"end_time"` + Selectors []string `json:"selectors"` + Matchers []*labels.Matcher `json:"-"` + UserID string `json:"user_id"` + State BlockDeleteRequestState `json:"state"` +} + +func NewTombstone(userID string, requestTime int64, stateTime int64, startTime int64, endTime int64, selectors []string, requestID string, state BlockDeleteRequestState) *Tombstone { + return &Tombstone{ + RequestCreatedAt: requestTime, + StateCreatedAt: stateTime, + StartTime: startTime, + EndTime: endTime, + Selectors: selectors, + UserID: userID, + RequestID: requestID, + State: state, + } +} + +// TombstoneManager is responsible for reading and writing tombstone files to the bucket. +type TombstoneManager struct { + bkt objstore.InstrumentedBucket + logger log.Logger +} + +func NewTombstoneManager( + bkt objstore.Bucket, + userID string, + cfgProvider bucket.TenantConfigProvider, + logger log.Logger) *TombstoneManager { + + return &TombstoneManager{ + bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), + logger: util_log.WithUserID(userID, logger), + } +} + +// Uploads a tombstone to object sotre +func (m *TombstoneManager) WriteTombstone(ctx context.Context, tombstone *Tombstone) error { + data, err := json.Marshal(tombstone) + if err != nil { + return errors.Wrap(err, "serialize tombstone") + } + + fullTombstonePath := path.Join(TombstonePath, getTombstoneFileName(tombstone.RequestID, tombstone.State)) + + // Check if the tombstone already exists for the same state. Could be the case the same request was made + // and is already in the middle of deleting series. Creating a new tombstone would restart + // the progress + tombstoneExists, err := m.TombstoneExists(ctx, tombstone.RequestID, tombstone.State) + if err != nil { + level.Error(m.logger).Log("msg", "unable to check if the same tombstone already exists", "requestID", tombstone.RequestID, "err", err) + } else if tombstoneExists { + return ErrTombstoneAlreadyExists + } + + return errors.Wrap(m.bkt.Upload(ctx, fullTombstonePath, bytes.NewReader(data)), "upload tombstone") +} + +func (m *TombstoneManager) TombstoneExists(ctx context.Context, requestID string, state BlockDeleteRequestState) (bool, error) { + fullTombstonePath := path.Join(TombstonePath, getTombstoneFileName(requestID, state)) + exists, err := m.bkt.Exists(ctx, fullTombstonePath) + + if exists || err != nil { + return exists, err + } + + return false, nil +} + +func (m *TombstoneManager) GetTombstoneByIDForUser(ctx context.Context, requestID string) (*Tombstone, error) { + found := []*Tombstone{} + + for _, state := range AllDeletionStates { + filename := getTombstoneFileName(requestID, state) + + t, err := m.ReadTombstone(ctx, path.Join(TombstonePath, filename)) + if errors.Is(err, ErrTombstoneNotFound) { + continue + } else if err != nil { + return nil, err + } + found = append(found, t) + } + + if len(found) == 0 { + return nil, nil + } + + // If there are multiple tombstones with the same request id but different state, want to return only the latest one + // The older states will be cleaned up by the compactor (TODO future PR). + return found[len(found)-1], nil + +} + +func (m *TombstoneManager) GetAllTombstonesForUser(ctx context.Context) ([]*Tombstone, error) { + // add all the tombstones to a map and check for duplicates: if a key exists with the same request ID (but two different states). + discovered := make(map[string]BlockDeleteRequestState) + err := m.bkt.Iter(ctx, TombstonePath, func(s string) error { + requestID, newerState, err := GetStateAndRequestIDFromTombstonePath(filepath.Base(s)) + if err != nil { + return err + } + + if prevState, exists := discovered[requestID]; !exists { + discovered[requestID] = newerState + } else { + // If there is more than one tombstone for a given request, we only want the latest state. + // States can move only in a specific direction, so the later state will always win. + newerStateOrder, err := newerState.getStateOrder() + if err != nil { + return err + } + prevStateOrder, err := prevState.getStateOrder() + if err != nil { + return err + } + + // If the newer state found comes later in the order, then we replace the state in the map. + if newerStateOrder > prevStateOrder { + discovered[requestID] = newerState + } + } + return nil + }) + if err != nil { + return nil, err + } + + out := []*Tombstone{} + for id, state := range discovered { + filename := getTombstoneFileName(id, state) + t, err := m.ReadTombstone(ctx, path.Join(TombstonePath, filename)) + if errors.Is(err, ErrTombstoneNotFound) { + // This could happen if the tombstone state changes and the tombstone file is deleted in between the "list objects" and now. + level.Warn(m.logger).Log("msg", "skipped missing tombstone file when loading all the tombstones", "requestID", id, "state", string(state)) + continue + } + if errors.Is(err, ErrTombstoneDecode) { + level.Error(m.logger).Log("msg", "skipped corrupted tombstone file when loading all the tombstones", "requestID", id, "state", state, "err", err) + continue + } + if err != nil { + return nil, err + } + out = append(out, t) + } + return out, nil +} + +func (m *TombstoneManager) ReadTombstone(ctx context.Context, tombstonePath string) (*Tombstone, error) { + _, _, err := GetStateAndRequestIDFromTombstonePath(tombstonePath) + if err != nil { + return nil, errors.Wrapf(err, "failed to get the requestID and state from filename: %s", tombstonePath) + } + + r, err := m.bkt.Get(ctx, tombstonePath) + if m.bkt.IsObjNotFoundErr(err) { + return nil, errors.Wrapf(ErrTombstoneNotFound, "tombstone file not found %s", tombstonePath) + } + if err != nil { + return nil, errors.Wrapf(err, "failed to read tombstone object: %s", tombstonePath) + } + + tombstone := &Tombstone{} + err = json.NewDecoder(r).Decode(tombstone) + + // Close reader before dealing with decode error. + if closeErr := r.Close(); closeErr != nil { + level.Warn(util_log.Logger).Log("msg", "failed to close bucket reader", "err", closeErr) + } + + if err != nil { + return nil, errors.Wrapf(ErrTombstoneDecode, "failed to decode tombstone object: %s, err: %v", tombstonePath, err.Error()) + } + + tombstone.Matchers, err = ParseMatchers(tombstone.Selectors) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse tombstone selectors for: %s", tombstonePath) + } + + return tombstone, nil +} + +func (m *TombstoneManager) UpdateTombstoneState(ctx context.Context, t *Tombstone, newState BlockDeleteRequestState) (*Tombstone, error) { + // Create the new tombstone, and will delete the previous tombstone + newT := NewTombstone(t.UserID, t.RequestCreatedAt, time.Now().Unix()*1000, t.StartTime, t.EndTime, t.Selectors, t.RequestID, newState) + newT.Matchers = t.Matchers + + err := m.WriteTombstone(ctx, newT) + if err != nil { + level.Error(m.logger).Log("msg", "error creating file tombstone file with the updated state", "requestID", t.RequestID, "updated state", newState, "err", err) + return nil, err + } + + if err = m.DeleteTombstoneFile(ctx, t.RequestID, t.State); err != nil { + level.Error(m.logger).Log("msg", "Created file with updated state but unable to delete previous state. Will retry next time tombstones are loaded", "requestID", t.RequestID, "previous state", t.State, "updated state", newState, err) + } + + return newT, nil +} + +func (m *TombstoneManager) DeleteTombstoneFile(ctx context.Context, requestID string, state BlockDeleteRequestState) error { + filename := getTombstoneFileName(requestID, state) + fullTombstonePath := path.Join(TombstonePath, filename) + + level.Info(m.logger).Log("msg", "Deleting tombstone file", "file", fullTombstonePath) + + return errors.Wrap(m.bkt.Delete(ctx, fullTombstonePath), "delete tombstone file") + +} + +func (m *TombstoneManager) RemoveCancelledStateIfExists(ctx context.Context, requestID string) error { + exists, err := m.TombstoneExists(ctx, requestID, StateCancelled) + if err != nil { + level.Error(m.logger).Log("msg", "unable to check if the request has previously been cancelled", "requestID", requestID, "err", err) + return err + } + + if exists { + if err = m.DeleteTombstoneFile(ctx, requestID, StateCancelled); err != nil { + level.Error(m.logger).Log("msg", "unable to delete tombstone with previously cancelled state", "requestID", requestID, "err", err) + return err + } + level.Info(m.logger).Log("msg", "Removing tombstone file with previously cancelled state", "requestID", requestID, "err", err) + + } + return nil +} + +func GetStateAndRequestIDFromTombstonePath(tombstonePath string) (string, BlockDeleteRequestState, error) { + // The filename of the request should be ..json + + // This should get the first extension which is .json + filenameExtesion := filepath.Ext(tombstonePath) + filenameWithoutJSON := tombstonePath[0 : len(tombstonePath)-len(filenameExtesion)] + + stateExtension := filepath.Ext(filenameWithoutJSON) + requestID := filenameWithoutJSON[0 : len(filenameWithoutJSON)-len(stateExtension)] + + // Ensure that the state exists as the filename extension + if len(stateExtension) == 0 { + return "", "", ErrInvalidDeletionRequestState + } + + state := BlockDeleteRequestState(stateExtension[1:]) + if !isValidDeleteRequestState(state) { + return "", "", errors.Wrapf(ErrInvalidDeletionRequestState, "Filename extension is invalid for tombstone: %s", tombstonePath) + + } + + return requestID, state, nil +} + +func ParseMatchers(selectors []string) ([]*labels.Matcher, error) { + // Convert the string selectors to label matchers + var m []*labels.Matcher + + for _, selector := range selectors { + parsed, err := parser.ParseMetricSelector(selector) + if err != nil { + return nil, errors.Wrapf(err, "error parsing metric selector") + } + //keep the matchers in a 1D slice because the deletions are applied based + // on the "and" between all matchers. + m = append(m, parsed...) + } + + return m, nil +} + +func (t *Tombstone) GetFilename() string { + return t.RequestID + "." + string(t.State) + ".json" +} + +func (t *Tombstone) IsOverlappingInterval(minT int64, maxT int64) bool { + return t.StartTime <= maxT && minT < t.EndTime +} + +func (t *Tombstone) GetCreateTime() time.Time { + return time.Unix(t.RequestCreatedAt/1000, 0) +} + +func getTombstoneFileName(requestID string, state BlockDeleteRequestState) string { + return requestID + "." + string(state) + ".json" +} + +func isValidDeleteRequestState(state BlockDeleteRequestState) bool { + switch state { + case + StatePending, + StateProcessed, + StateCancelled: + return true + } + return false +} + +func (s BlockDeleteRequestState) getStateOrder() (int, error) { + switch s { + case StatePending: + return 0, nil + case StateProcessed: + return 2, nil + case StateCancelled: + return 3, nil + } + + return -1, ErrInvalidDeletionRequestState +} diff --git a/pkg/storage/tsdb/tombstones_test.go b/pkg/storage/tsdb/tombstones_test.go new file mode 100644 index 00000000000..b7ec019f384 --- /dev/null +++ b/pkg/storage/tsdb/tombstones_test.go @@ -0,0 +1,270 @@ +package tsdb + +import ( + "bytes" + "context" + "path" + "testing" + + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/objstore" + "github.com/weaveworks/common/user" +) + +func TestTombstones_WritingSameTombstoneTwiceShouldFail(t *testing.T) { + + username := "user" + requestID := "requestID" + + bkt := objstore.NewInMemBucket() + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + tManager := NewTombstoneManager(bkt, username, nil, log.NewNopLogger()) + + //create the tombstone + tombstone := NewTombstone(username, 0, 0, 0, 1, []string{"match"}, requestID, StatePending) + err := tManager.WriteTombstone(ctx, tombstone) + require.NoError(t, err) + + filename := requestID + "." + string(StatePending) + ".json" + exists, _ := bkt.Exists(ctx, path.Join(username, TombstonePath, filename)) + require.True(t, exists) + + // Creating the same tombstone twice should result in an error + err = tManager.WriteTombstone(ctx, tombstone) + require.ErrorIs(t, err, ErrTombstoneAlreadyExists) + +} + +func TestTombstonesExists(t *testing.T) { + const username = "user" + const requestID = "requestID" + + for name, tc := range map[string]struct { + objects map[string][]byte + targetRequestState BlockDeleteRequestState + exists bool + }{ + "no tombstones exist": { + objects: nil, + targetRequestState: StatePending, + exists: false, + }, + + "tombstone exists but different state": { + objects: map[string][]byte{ + username + "/tombstones/" + requestID + "." + string(StateProcessed) + ".json": []byte("data"), + username + "/tombstones/" + requestID + "." + string(StateCancelled) + ".json": []byte("data"), + }, + targetRequestState: StatePending, + exists: false, + }, + + "tombstone exists with correct state": { + objects: map[string][]byte{ + username + "/tombstones/" + requestID + "." + string(StatePending) + ".json": []byte("data"), + }, + targetRequestState: StatePending, + exists: true, + }, + } { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + bkt := objstore.NewInMemBucket() + // "upload" sample tombstone files + for objName, data := range tc.objects { + require.NoError(t, bkt.Upload(context.Background(), objName, bytes.NewReader(data))) + } + tManager := NewTombstoneManager(bkt, username, nil, log.NewNopLogger()) + + res, err := tManager.TombstoneExists(ctx, requestID, tc.targetRequestState) + require.NoError(t, err) + require.Equal(t, tc.exists, res) + }) + } +} + +func TestTombstonesDeletion(t *testing.T) { + const username = "user" + const requestID = "requestID" + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + tPending := NewTombstone(username, 0, 0, 0, 0, []string{}, requestID, StatePending) + + tPendingPath := username + "/tombstones/" + requestID + "." + string(StatePending) + ".json" + tProcessedPath := username + "/tombstones/" + requestID + "." + string(StateProcessed) + ".json" + + bkt := objstore.NewInMemBucket() + // "upload" sample tombstone file + require.NoError(t, bkt.Upload(context.Background(), tPendingPath, bytes.NewReader([]byte("data")))) + require.NoError(t, bkt.Upload(context.Background(), tProcessedPath, bytes.NewReader([]byte("data")))) + + tManager := NewTombstoneManager(bkt, tPending.UserID, nil, log.NewNopLogger()) + require.NoError(t, tManager.DeleteTombstoneFile(ctx, tPending.RequestID, tPending.State)) + + // make sure the pending tombstone was deleted + exists, _ := bkt.Exists(ctx, tPendingPath) + require.False(t, exists) + + // the processed tombstone with the same requestID should still be in the bucket + exists, _ = bkt.Exists(ctx, tProcessedPath) + require.True(t, exists) + +} + +func TestTombstoneUpdateState(t *testing.T) { + const username = "user" + const requestID = "requestID" + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + tPending := NewTombstone(username, 0, 0, 0, 0, []string{}, requestID, StatePending) + tPendingPath := username + "/tombstones/" + requestID + "." + string(StatePending) + ".json" + + bkt := objstore.NewInMemBucket() + tManager := NewTombstoneManager(bkt, username, nil, log.NewNopLogger()) + + // "upload" sample tombstone file + require.NoError(t, bkt.Upload(context.Background(), tPendingPath, bytes.NewReader([]byte("data")))) + + tProcessed, err := tManager.UpdateTombstoneState(ctx, tPending, StateProcessed) + require.NoError(t, err) + + // make sure the pending tombstone was deleted + exists, _ := bkt.Exists(ctx, tPendingPath) + require.False(t, exists) + + // check that the new tombstone with the updated state has been created + tProcessedPath := username + "/tombstones/" + tProcessed.RequestID + "." + string(tProcessed.State) + ".json" + exists, _ = bkt.Exists(ctx, tProcessedPath) + require.True(t, exists) +} + +func TestGetSingleTombstone(t *testing.T) { + const username = "user" + const requestID = "requestID" + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + // When getting a specific request id, there could be a case where multiple + // files exist for the same request but with different extensions to indicate a + // different state. If thats the case, then the older state should be deleted. + + // Add multiple tombstones with the same request id but different states + tPending := NewTombstone(username, 0, 0, 0, 0, []string{"node_exporter"}, requestID, StatePending) + tProcessed := NewTombstone(username, 10, 20, 30, 60, []string{"node_exporter"}, requestID, StateProcessed) + + bkt := objstore.NewInMemBucket() + tManager := NewTombstoneManager(bkt, tPending.UserID, nil, log.NewNopLogger()) + + // first add the tombstone files to the object store + require.NoError(t, tManager.WriteTombstone(ctx, tPending)) + require.NoError(t, tManager.WriteTombstone(ctx, tProcessed)) + + tRetrieved, err := tManager.GetTombstoneByIDForUser(ctx, requestID) + require.NoError(t, err) + + //verify that all the information was read correctly + require.Equal(t, tProcessed.StartTime, tRetrieved.StartTime) + require.Equal(t, tProcessed.EndTime, tRetrieved.EndTime) + require.Equal(t, tProcessed.RequestCreatedAt, tRetrieved.RequestCreatedAt) + require.Equal(t, tProcessed.StateCreatedAt, tRetrieved.StateCreatedAt) + require.Equal(t, tProcessed.Selectors, tRetrieved.Selectors) + require.Equal(t, tProcessed.RequestID, tRetrieved.RequestID) + require.Equal(t, tProcessed.UserID, tRetrieved.UserID) + require.Equal(t, tProcessed.State, tRetrieved.State) + + // Get single tombstone that doesn't exist should return nil + tRetrieved, err = tManager.GetTombstoneByIDForUser(ctx, "unknownRequestID") + require.NoError(t, err) + require.Nil(t, tRetrieved) +} + +func TestGetAllTombstones(t *testing.T) { + const username = "user" + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + bkt := objstore.NewInMemBucket() + + tManager := NewTombstoneManager(bkt, username, nil, log.NewNopLogger()) + + tombstonesInput := []*Tombstone{ + NewTombstone(username, 0, 0, 0, 0, []string{}, "request1", StatePending), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request1", StateCancelled), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request2", StatePending), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request3", StatePending), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request4", StateProcessed), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request5", StateCancelled), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request6", StatePending), + NewTombstone(username, 0, 0, 0, 0, []string{}, "request6", StateProcessed), + } + + requiredOutput := map[string]BlockDeleteRequestState{ + "request1": StateCancelled, + "request2": StatePending, + "request3": StatePending, + "request4": StateProcessed, + "request5": StateCancelled, + "request6": StateProcessed, + } + + // add all tombstones to the bkt + for _, ts := range tombstonesInput { + require.NoError(t, tManager.WriteTombstone(ctx, ts)) + } + + tombstonesOutput, err := tManager.GetAllTombstonesForUser(ctx) + require.NoError(t, err) + + outputMap := make(map[string]BlockDeleteRequestState) + for _, ts := range tombstonesOutput { + _, exists := outputMap[ts.RequestID] + // There should not be more than one ts for each request id + require.False(t, exists) + + outputMap[ts.RequestID] = ts.State + } + + require.Equal(t, requiredOutput, outputMap) +} + +func TestTombstoneReadWithInvalidFileName(t *testing.T) { + const username = "user" + const requestID = "requestID" + bkt := objstore.NewInMemBucket() + + ctx := context.Background() + ctx = user.InjectOrgID(ctx, username) + + tManager := NewTombstoneManager(bkt, username, nil, log.NewNopLogger()) + + { + tInvalidPath := username + "/tombstones/" + requestID + "." + string(StatePending) + _, err := tManager.ReadTombstone(ctx, tInvalidPath) + + require.ErrorIs(t, err, ErrInvalidDeletionRequestState) + } + + { + tInvalidPath := username + "/tombstones/" + requestID + _, err := tManager.ReadTombstone(ctx, tInvalidPath) + + require.ErrorIs(t, err, ErrInvalidDeletionRequestState) + } + + { + tInvalidPath := username + "/tombstones/" + requestID + ".json." + string(StatePending) + _, err := tManager.ReadTombstone(ctx, tInvalidPath) + require.ErrorIs(t, err, ErrInvalidDeletionRequestState) + } + +}