Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block deletion endpoints #1

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f5deb2e
Add endpoints for blocks series deletion API
ilangofman Jul 13, 2021
220a47e
Add unit tests and address PR comments
ilangofman Jul 14, 2021
6f7d749
change 1 function name
ilangofman Jul 14, 2021
1f89173
Remove the deletion during get requests
ilangofman Jul 15, 2021
2feb3ce
make the hashing consistent no matter the selector order
ilangofman Jul 15, 2021
342d673
fix edge case
ilangofman Jul 18, 2021
361d166
Fix feature flag
ilangofman Jul 18, 2021
f1fc186
refactor minor
ilangofman Jul 19, 2021
f9c16f6
minor refactor
ilangofman Jul 19, 2021
8f0338f
Fix lint errors
ilangofman Jul 19, 2021
a5e62f0
fix lint errors and add changelog
ilangofman Jul 19, 2021
6836cfb
Merge branch 'master' into block_deletion_endpoints
ilangofman Jul 19, 2021
9d090a3
Latest changes from master branch
ilangofman Aug 6, 2021
16729d3
update changelog
ilangofman Aug 6, 2021
4bafb4e
fix Changelog
ilangofman Aug 6, 2021
1dfaf28
Remove extra print statement
ilangofman Aug 6, 2021
08b4c1e
Minor refactor
ilangofman Aug 16, 2021
b811d2e
Add a tombstone manager to simplify the creation/getting tombstones
ilangofman Aug 16, 2021
4aa7dd3
Merge branch 'master' into block_deletion_endpoints
ilangofman Aug 17, 2021
a0524cd
remove unneeded variable
ilangofman Aug 17, 2021
06aec3d
Merge branch 'block_deletion_endpoints' of https://github.com/ilangof…
ilangofman Aug 17, 2021
3b0ec49
fix comment
ilangofman Aug 17, 2021
212f876
Address PR comments
ilangofman Aug 19, 2021
7d6963d
reverse change of combining blocks purger and tenant deletion
ilangofman Sep 20, 2021
fd11ec1
Undo change of combining tenant deletion and series deletion API files.
ilangofman Sep 20, 2021
d9c5b92
Merge branch 'master' into block_deletion_endpoints
Jul 11, 2022
9263ca6
Make changes based on PR comments
Jul 13, 2022
06cb921
fix return content type of get delete requests
Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,20 @@ func (a *API) RegisterChunksPurger(store *purger.DeleteStore, deleteRequestCance
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(deleteRequestHandler.CancelDeleteRequestHandler), true, "PUT", "POST")
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
a.RegisterRoute("/purger/delete_tenant", http.HandlerFunc(api.DeleteTenant), true, "POST")
a.RegisterRoute("/purger/delete_tenant_status", http.HandlerFunc(api.DeleteTenantStatus), true, "GET")
func (a *API) RegisterBlocksPurger(blocksPurger *purger.BlocksPurgerAPI) {

a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.GetAllDeleteRequestsHandler), true, "GET")
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "PUT", "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/delete_series"), http.HandlerFunc(blocksPurger.AddDeleteRequestHandler), true, "GET")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/api/v1/admin/tsdb/cancel_delete_request"), http.HandlerFunc(blocksPurger.CancelDeleteRequestHandler), true, "PUT", "POST")

// Tenant Deletion
a.RegisterRoute("/purger/delete_tenant", http.HandlerFunc(blocksPurger.DeleteTenant), true, "POST")
a.RegisterRoute("/purger/delete_tenant_status", http.HandlerFunc(blocksPurger.DeleteTenantStatus), true, "GET")
}

// RegisterRuler registers routes associated with the Ruler service.
Expand Down
312 changes: 312 additions & 0 deletions pkg/chunk/purger/blocks_purger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
package purger

import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
fmt "fmt"
"net/http"
"strconv"
strings "strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/objstore"

"github.com/cortexproject/cortex/pkg/storage/bucket"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
)

type BlocksPurgerAPI struct {
bucketClient objstore.Bucket
logger log.Logger
cfgProvider bucket.TenantConfigProvider
deleteRequestCancelPeriod time.Duration
}

func NewBlocksPurgerAPI(storageCfg cortex_tsdb.BlocksStorageConfig, cfgProvider bucket.TenantConfigProvider, logger log.Logger, reg prometheus.Registerer, cancellationPeriod time.Duration) (*BlocksPurgerAPI, error) {
bucketClient, err := createBucketClient(storageCfg, logger, reg)
if err != nil {
return nil, err
}

return newBlocksPurgerAPI(bucketClient, cfgProvider, logger, cancellationPeriod), nil
}

func newBlocksPurgerAPI(bkt objstore.Bucket, cfgProvider bucket.TenantConfigProvider, logger log.Logger, cancellationPeriod time.Duration) *BlocksPurgerAPI {
ilangofman marked this conversation as resolved.
Show resolved Hide resolved
return &BlocksPurgerAPI{
bucketClient: bkt,
cfgProvider: cfgProvider,
logger: logger,
deleteRequestCancelPeriod: cancellationPeriod,
}
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}

return bucketClient, nil
}

func (api *BlocksPurgerAPI) AddDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

params := r.URL.Query()
match := params["match[]"]
if len(match) == 0 {
http.Error(w, "selectors not set", http.StatusBadRequest)
return
}

for i := range match {
_, err := parser.ParseMetricSelector(match[i])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

startParam := params.Get("start")
startTime := int64(0)
if startParam != "" {
startTime, err = util.ParseTime(startParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}

endParam := params.Get("end")
endTime := int64(model.Now())

if endParam != "" {
endTime, err = util.ParseTime(endParam)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

if endTime > int64(model.Now()) {
http.Error(w, "deletes in future not allowed", http.StatusBadRequest)
return
}
}

if startTime > endTime {
http.Error(w, "start time can't be greater than end time", http.StatusBadRequest)
return
}

requestId := getTombstoneRequestID(startTime, endTime, match)
curTime := time.Now().Unix() * 1000
t := cortex_tsdb.NewTombstone(userID, curTime, curTime, startTime, endTime, match, requestId, cortex_tsdb.StatePending)

if err = cortex_tsdb.WriteTombstoneFile(ctx, api.bucketClient, userID, api.cfgProvider, t); err != nil {

if err == cortex_tsdb.ErrTombstoneAlreadyExists {
level.Error(util_log.Logger).Log("msg", "delete request tombstone with same information already exists", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

level.Error(util_log.Logger).Log("msg", "error adding delete request to the store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}

func (api *BlocksPurgerAPI) GetAllDeleteRequestsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

deleteRequests, err := cortex_tsdb.GetAllDeleteRequestsForUser(ctx, api.bucketClient, api.cfgProvider, userID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete requests from the block store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if err := json.NewEncoder(w).Encode(deleteRequests); err != nil {
level.Error(util_log.Logger).Log("msg", "error marshalling response", "err", err)
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}

w.WriteHeader(http.StatusOK)

}

func (api *BlocksPurgerAPI) CancelDeleteRequestHandler(w http.ResponseWriter, r *http.Request) {

ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

params := r.URL.Query()
requestID := params.Get("request_id")

deleteRequest, err := cortex_tsdb.GetDeleteRequestByIdForUser(ctx, api.bucketClient, api.cfgProvider, userID, requestID)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error getting delete request from the object store", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

if deleteRequest == nil {
http.Error(w, "could not find delete request with given id", http.StatusBadRequest)
return
}

if deleteRequest.State == cortex_tsdb.StateCancelled {
http.Error(w, "the request has already been previously deleted", http.StatusBadRequest)
return
}

if deleteRequest.State == cortex_tsdb.StateProcessed {
http.Error(w, "deletion of request which is already processed is not allowed", http.StatusBadRequest)
return
}

currentTime := int64(time.Now().Unix() * 1000)
timeElapsed := float64(currentTime - deleteRequest.RequestCreatedAt)

if timeElapsed > float64(api.deleteRequestCancelPeriod.Milliseconds()) {
http.Error(w, fmt.Sprintf("deletion of request past the deadline of %s since its creation is not allowed", api.deleteRequestCancelPeriod.String()), http.StatusBadRequest)
return
}

// create file with the cancelled state
_, err = cortex_tsdb.UpdateTombstoneState(ctx, api.bucketClient, api.cfgProvider, deleteRequest, cortex_tsdb.StateCancelled)
if err != nil {
level.Error(util_log.Logger).Log("msg", "error cancelling the delete request", "err", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}

// Calculates the tombstone file name based on a hash of the start time, end time and selectors
func getTombstoneRequestID(startTime int64, endTime int64, selectors []string) string {
ilangofman marked this conversation as resolved.
Show resolved Hide resolved
// First make a string of the tombstone info
var b strings.Builder
b.WriteString(strconv.FormatInt(startTime, 10))
b.WriteString(",")
b.WriteString(strconv.FormatInt(endTime, 10))

for _, s := range selectors {
b.WriteString(",")
b.WriteString(s)
}

return getTombstoneHash(b.String())
}

func getTombstoneHash(s string) string {
data := []byte(s)
md5Bytes := md5.Sum(data)
return hex.EncodeToString(md5Bytes[:])
}

func (api *BlocksPurgerAPI) DeleteTenant(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
// When Cortex is running, it uses Auth Middleware for checking X-Scope-OrgID and injecting tenant into context.
// Auth Middleware sends http.StatusUnauthorized if X-Scope-OrgID is missing, so we do too here, for consistency.
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}

err = cortex_tsdb.WriteTenantDeletionMark(r.Context(), api.bucketClient, userID, api.cfgProvider, cortex_tsdb.NewTenantDeletionMark(time.Now()))
if err != nil {
level.Error(api.logger).Log("msg", "failed to write tenant deletion mark", "user", userID, "err", err)

http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

level.Info(api.logger).Log("msg", "tenant deletion mark in blocks storage created", "user", userID)

w.WriteHeader(http.StatusOK)
}

type DeleteTenantStatusResponse struct {
TenantID string `json:"tenant_id"`
BlocksDeleted bool `json:"blocks_deleted"`
}

func (api *BlocksPurgerAPI) DeleteTenantStatus(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

result := DeleteTenantStatusResponse{}
result.TenantID = userID
result.BlocksDeleted, err = api.isBlocksForUserDeleted(ctx, userID)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

util.WriteJSONResponse(w, result)
}

func (api *BlocksPurgerAPI) isBlocksForUserDeleted(ctx context.Context, userID string) (bool, error) {
var errBlockFound = errors.New("block found")

userBucket := bucket.NewUserBucketClient(userID, api.bucketClient, api.cfgProvider)
err := userBucket.Iter(ctx, "", func(s string) error {
s = strings.TrimSuffix(s, "/")

_, err := ulid.Parse(s)
if err != nil {
// not block, keep looking
return nil
}

// Used as shortcut to stop iteration.
return errBlockFound
})

if errors.Is(err, errBlockFound) {
return false, nil
}

if err != nil {
return false, err
}

// No blocks found, all good.
return true, nil
}
Loading