diff --git a/pkg/chunk/purger/blocks_purger.go b/pkg/chunk/purger/blocks_purger.go index ee02b95164..7fa39b8358 100644 --- a/pkg/chunk/purger/blocks_purger.go +++ b/pkg/chunk/purger/blocks_purger.go @@ -158,7 +158,7 @@ func (api *BlocksPurgerAPI) GetAllDeleteRequestsHandler(w http.ResponseWriter, r return } tManager := cortex_tsdb.NewTombstoneManager(api.bucketClient, userID, api.cfgProvider, api.logger) - deleteRequests, err := tManager.GetAllDeleteRequestsForUser(ctx) + deleteRequests, err := tManager.GetAllDeleteRequestsForUser(ctx, nil) if err != nil { level.Error(util_log.Logger).Log("msg", "error getting delete requests from the block store", "err", err) http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/pkg/chunk/purger/tombstones.go b/pkg/chunk/purger/tombstones.go index fdf2cc0914..e8722b27c4 100644 --- a/pkg/chunk/purger/tombstones.go +++ b/pkg/chunk/purger/tombstones.go @@ -48,6 +48,14 @@ type TombstonesSet struct { oldestTombstoneStart, newestTombstoneEnd model.Time // Used as optimization to find whether we want to iterate over tombstones or not } +func NewTombstoneSet(t []DeleteRequest, start model.Time, end model.Time) *TombstonesSet { + return &TombstonesSet{ + tombstones: t, + oldestTombstoneStart: start, + newestTombstoneEnd: end, + } +} + // Used for easier injection of mocks. type DeleteStoreAPI interface { getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 0ebd79aa83..cab614eec2 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -40,6 +40,7 @@ type BlocksCleaner struct { logger log.Logger bucketClient objstore.Bucket usersScanner *cortex_tsdb.UsersScanner + bucketCfg cortex_tsdb.BucketStoreConfig // Keep track of the last owned users. lastOwnedUsers []string @@ -58,10 +59,11 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, bktCfg cortex_tsdb.BucketStoreConfig, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, + bucketCfg: bktCfg, usersScanner: usersScanner, cfgProvider: cfgProvider, logger: log.With(logger, "component", "cleaner"), @@ -329,7 +331,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } // Generate an updated in-memory version of the bucket index. - w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) + w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.cfg.DeletionDelay, c.cfg.CleanupInterval, c.bucketCfg, c.logger) idx, partials, err := w.UpdateIndex(ctx, idx) if err != nil { return err diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 7576c6a6d8..d5d84aa30b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/tsdb" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" @@ -98,6 +99,16 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json") require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here"))) + // create sample tombstones + tombstone1 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + tombstone2 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending) + tombstone3 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series1"}, "request3", cortex_tsdb.StatePending) + tombstone4 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series2"}, "request4", cortex_tsdb.StateCancelled) + uploadTombstone(t, bucketClient, "user-1", tombstone1) + uploadTombstone(t, bucketClient, "user-1", tombstone2) + uploadTombstone(t, bucketClient, "user-2", tombstone3) + uploadTombstone(t, bucketClient, "user-2", tombstone4) + // The fixtures have been created. If the bucket client wasn't wrapped to write // deletion marks to the global location too, then this is the right time to do it. if options.markersMigrationEnabled { @@ -117,7 +128,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -166,21 +177,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions // Check the updated bucket index. for _, tc := range []struct { - userID string - expectedIndex bool - expectedBlocks []ulid.ULID - expectedMarks []ulid.ULID + userID string + expectedIndex bool + expectedBlocks []ulid.ULID + expectedMarks []ulid.ULID + expectedTombstonesIDs []string }{ { - userID: "user-1", - expectedIndex: true, - expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */}, - expectedMarks: []ulid.ULID{block2}, + userID: "user-1", + expectedIndex: true, + expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */}, + expectedMarks: []ulid.ULID{block2}, + expectedTombstonesIDs: []string{"request1", "request2"}, }, { - userID: "user-2", - expectedIndex: true, - expectedBlocks: []ulid.ULID{block8}, - expectedMarks: []ulid.ULID{}, + userID: "user-2", + expectedIndex: true, + expectedBlocks: []ulid.ULID{block8}, + expectedMarks: []ulid.ULID{}, + expectedTombstonesIDs: []string{"request3"}, + // request4 should not be included because it is + // cancelled and should not be used for query filtering }, { userID: "user-3", expectedIndex: false, @@ -195,6 +211,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions require.NoError(t, err) assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs()) + assert.ElementsMatch(t, tc.expectedTombstonesIDs, idx.Tombstones.GetRequestIDs()) } assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -251,7 +268,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -297,6 +314,10 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { block3 := createTSDBBlock(t, bucketClient, userID, 30, 40, nil) createDeletionMark(t, bucketClient, userID, block2, now.Add(-deletionDelay).Add(-time.Hour)) createDeletionMark(t, bucketClient, userID, block3, now.Add(-deletionDelay).Add(time.Hour)) + tombstone1 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + tombstone2 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending) + uploadTombstone(t, bucketClient, userID, tombstone1) + uploadTombstone(t, bucketClient, userID, tombstone2) // Write a corrupted bucket index. require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}"))) @@ -311,7 +332,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -339,6 +360,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { require.NoError(t, err) assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs()) + assert.ElementsMatch(t, []string{"request1", "request2"}, idx.Tombstones.GetRequestIDs()) } func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) { @@ -362,7 +384,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) require.NoError(t, cleaner.cleanUsers(ctx, true)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -420,7 +442,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) { id2 := createTSDBBlock(t, bucketClient, "user-1", 6000, 7000, nil) id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil) - w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger) + w := bucketindex.NewUpdater(bucketClient, "user-1", nil, 0, 0, newMockBucketStoreCfg(), logger) idx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) @@ -493,7 +515,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) @@ -681,3 +703,12 @@ func (m *mockConfigProvider) S3SSEKMSKeyID(userID string) string { func (m *mockConfigProvider) S3SSEKMSEncryptionContext(userID string) string { return "" } + +func newMockBucketStoreCfg() cortex_tsdb.BucketStoreConfig { + return cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + BucketIndex: cortex_tsdb.BucketIndexConfig{ + MaxStalePeriod: time.Hour, + }, + } +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 79dc24d830..7a9a2f2df5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -360,7 +360,7 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer) + }, c.bucketClient, c.usersScanner, c.cfgProvider, c.storageCfg.BucketStore, c.parentLogger, c.registerer) // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 911c59aa26..1823cede2e 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -422,6 +422,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockIter(userID+"/tombstones/", nil, nil) bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) @@ -476,6 +477,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) + bucketClient.MockIter("user-2/tombstones/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) @@ -605,6 +608,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) @@ -713,6 +717,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil) bucketClient.MockDelete("user-1/bucket-index.json.gz", nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) @@ -808,6 +813,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) + bucketClient.MockIter("user-2/tombstones/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) @@ -886,6 +893,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockIter(userID+"/tombstones/", nil, nil) bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) @@ -1031,6 +1039,15 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) } +func uploadTombstone(t *testing.T, bkt objstore.Bucket, userID string, tombstone *cortex_tsdb.Tombstone) { + tombstoneFilename := tombstone.GetFilename() + path := path.Join(userID, cortex_tsdb.TombstonePath, tombstoneFilename) + data, err := json.Marshal(tombstone) + + require.NoError(t, err) + require.NoError(t, bkt.Upload(context.Background(), path, bytes.NewReader(data))) +} + func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { var compactor *Compactor var log *concurrency.SyncBuffer diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 368ea60420..3f29a39054 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -2,14 +2,19 @@ package querier import ( "context" + "math" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" @@ -107,3 +112,70 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return blocks, matchingDeletionMarks, nil } + +func (f *BucketIndexBlocksFinder) GetTombstones(ctx context.Context, userID string, minT int64, maxT int64) (*purger.TombstonesSet, error) { + if f.State() != services.Running { + return nil, errBucketIndexBlocksFinderNotRunning + } + if maxT < minT { + return nil, errInvalidBlocksRange + } + + // Get the bucket index for this user. + idx, err := f.loader.GetIndex(ctx, userID) + if errors.Is(err, bucketindex.ErrIndexNotFound) { + // This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet + // so the bucket index hasn't been created yet. + return nil, nil + } + if err != nil { + return nil, err + } + + // Ensure the bucket index is not too old. + if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod { + return nil, errBucketIndexTooOld + } + + tConverted := []purger.DeleteRequest{} + var tMinTime int64 = math.MaxInt64 + var tMaxTime int64 = math.MinInt64 + for _, t := range idx.Tombstones { + if !t.IsOverlappingInterval(minT, maxT) { + continue + } + + // Convert the tombstone into a deletion request which was implemented for chunk store deletion + // This will allow many of the query filtering code to be shared among block/chunk store + matchers, err := cortex_tsdb.ParseMatchers(t.Selectors) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse tombstone selectors for: %s", t.RequestID) + } + + request := purger.DeleteRequest{ + StartTime: model.Time(t.StartTime), + EndTime: model.Time(t.EndTime), + Matchers: [][]*labels.Matcher{matchers}, + } + tConverted = append(tConverted, request) + + if t.StartTime < tMinTime { + tMinTime = t.StartTime + } + if t.EndTime > tMaxTime { + tMaxTime = t.EndTime + } + } + + // Reduce the interval that tombstone will be applied if possible + if minT > tMinTime { + tMinTime = minT + } + if maxT < tMaxTime { + tMaxTime = maxT + } + tombstoneSet := purger.NewTombstoneSet(tConverted, model.Time(tMinTime), model.Time(tMaxTime)) + + return tombstoneSet, nil + +} diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 151ff2a3be..5a4ed76178 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -2,6 +2,7 @@ package querier import ( "context" + "math" "path" "strings" "testing" @@ -9,10 +10,14 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" @@ -153,7 +158,104 @@ func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) { } } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) { +func TestBucketIndexBlocksFinder_GetTombstones(t *testing.T) { + const userID = "user-1" + + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Mock a bucket index. + t1 := cortex_tsdb.NewTombstone(userID, 0, 0, 10, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + t1.Matchers, _ = cortex_tsdb.ParseMatchers(t1.Selectors) + t2 := cortex_tsdb.NewTombstone(userID, 0, 0, 12, 20, []string{"series2"}, "request2", cortex_tsdb.StatePending) + t2.Matchers, _ = cortex_tsdb.ParseMatchers(t2.Selectors) + t3 := cortex_tsdb.NewTombstone(userID, 0, 0, 20, 30, []string{"series3"}, "request3", cortex_tsdb.StatePending) + t3.Matchers, _ = cortex_tsdb.ParseMatchers(t3.Selectors) + t4 := cortex_tsdb.NewTombstone(userID, 0, 0, 30, 40, []string{"series4"}, "request4", cortex_tsdb.StatePending) + t4.Matchers, _ = cortex_tsdb.ParseMatchers(t4.Selectors) + + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + Tombstones: bucketindex.SeriesDeletionTombstones{t1, t2, t3, t4}, + UpdatedAt: time.Now().Unix(), + })) + + finder := prepareBucketIndexBlocksFinder(t, bkt) + + tests := map[string]struct { + minT int64 + maxT int64 + expectedTombstoneSet *purger.TombstonesSet + }{ + "no matching tombstones because the range is too low": { + minT: 0, + maxT: 5, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{}, math.MaxInt64, math.MinInt64), + }, + "no matching tombstones because the range is too high": { + minT: 50, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{}, math.MaxInt64, math.MinInt64), + }, + "matching all tombstones": { + minT: 0, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t1.StartTime), EndTime: model.Time(t1.EndTime), Matchers: [][]*labels.Matcher{t1.Matchers}}, + {StartTime: model.Time(t2.StartTime), EndTime: model.Time(t2.EndTime), Matchers: [][]*labels.Matcher{t2.Matchers}}, + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t1.StartTime), model.Time(t4.EndTime)), + }, + "query range starting at a tombstone end time": { + minT: t3.EndTime, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t4.StartTime), model.Time(t4.EndTime)), + }, + "query range ending at a tombstone start time": { + minT: t3.StartTime, + maxT: t4.EndTime, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t3.StartTime), model.Time(t4.EndTime)), + }, + "query range within a single tombstone": { + minT: t3.StartTime + 2, + maxT: t3.EndTime - 2, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + }, model.Time(t3.StartTime+2), model.Time(t3.EndTime-2)), + }, + "query range within multiple tombstones": { + minT: 13, + maxT: 16, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t1.StartTime), EndTime: model.Time(t1.EndTime), Matchers: [][]*labels.Matcher{t1.Matchers}}, + {StartTime: model.Time(t2.StartTime), EndTime: model.Time(t2.EndTime), Matchers: [][]*labels.Matcher{t2.Matchers}}, + }, 13, 16), + }, + "query range matching exactly a single block": { + minT: t3.StartTime, + maxT: t3.EndTime - 1, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + }, model.Time(t3.StartTime), model.Time(t3.EndTime-1)), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + tombstoneSet, err := finder.GetTombstones(ctx, userID, testData.minT, testData.maxT) + require.NoError(t, err) + require.Equal(t, testData.expectedTombstoneSet, tombstoneSet) + }) + } +} + +func TestBucketIndexBlocksFinder_BucketIndexDoesNotExist(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -164,9 +266,13 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) require.NoError(t, err) assert.Empty(t, blocks) assert.Empty(t, deletionMarks) + + tombstoneSet, err := finder.GetTombstones(ctx, userID, 10, 20) + require.NoError(t, err) + assert.Empty(t, tombstoneSet) } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) { +func TestBucketIndexBlocksFinder_BucketIndexIsCorrupted(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -178,9 +284,12 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) _, _, err := finder.GetBlocks(ctx, userID, 10, 20) require.Equal(t, bucketindex.ErrIndexCorrupted, err) + + _, err = finder.GetTombstones(ctx, userID, 10, 20) + require.Equal(t, bucketindex.ErrIndexCorrupted, err) } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { +func TestBucketIndexBlocksFinder_BucketIndexIsTooOld(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -196,6 +305,9 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { _, _, err := finder.GetBlocks(ctx, userID, 10, 20) require.Equal(t, errBucketIndexTooOld, err) + + _, err = finder.GetTombstones(ctx, userID, 10, 20) + require.Equal(t, errBucketIndexTooOld, err) } func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder { diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index bfa2086313..ec92c2255b 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" @@ -150,6 +151,11 @@ func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, min return matchingMetas, matchingDeletionMarks, nil } +func (d *BucketScanBlocksFinder) GetTombstones(_ context.Context, _ string, _ int64, _ int64) (*purger.TombstonesSet, error) { + // Series deletion query time filtering is not supported without bucket index enabled + return nil, nil +} + func (d *BucketScanBlocksFinder) starting(ctx context.Context) error { // Before the service is in the running state it must have successfully // complete the initial scan. diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 69584f01ba..f9c7ce7a85 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" @@ -27,6 +28,7 @@ import ( "golang.org/x/sync/errgroup" grpc_metadata "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/querier/stats" @@ -76,6 +78,9 @@ type BlocksFinder interface { // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) + + // GetTombstones returns all the tombstones that are currently required for filtering deleted series. + GetTombstones(ctx context.Context, userID string, minT, maxT int64) (*purger.TombstonesSet, error) } // BlocksStoreClient is the interface that should be implemented by any client used @@ -415,8 +420,8 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) leftChunksLimit = maxChunksLimit - - resultMtx sync.Mutex + tombstoneSet *purger.TombstonesSet + resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { @@ -447,11 +452,21 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* if len(resSeriesSets) == 0 { storage.EmptySeriesSet() + } else { + tombstoneSet, err = q.finder.GetTombstones(spanCtx, q.userID, minT, maxT) + if err != nil { + return storage.ErrSeriesSet(err) + } } - return series.NewSeriesSetWithWarnings( - storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge), - resWarnings) + seriesSet := storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge) + + if tombstoneSet != nil && tombstoneSet.Len() != 0 { + seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstoneSet, model.Interval{Start: model.Time(minT), End: model.Time(maxT)}) + } + + return series.NewSeriesSetWithWarnings(seriesSet, resWarnings) + } func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 40e868be4a..90ff7a7326 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" @@ -28,6 +29,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" @@ -44,14 +46,16 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} - noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) + metricNameMatcher = labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName) + seriesMatcher2 = labels.MustNewMatcher(labels.MatchEqual, "series", "2") ) type valueResult struct { @@ -73,6 +77,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedSeries []seriesResult expectedErr error expectedMetrics string + tombstoneSet *purger.TombstonesSet }{ "no block in the storage matching the query time range": { finderResult: nil, @@ -584,6 +589,172 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { queryLimiter: limiter.NewQueryLimiter(0, 8, 0), expectedErr: validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, 8)), }, + "single tombstones exist and should filter the entire returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 2), + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult(nil), + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 3), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+3)), + }, + "single tombstones exist and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+1)), + }, + "multiple tombstones exist for the same series and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+10, 3), + + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+10, 3), + + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + {t: minT + 10, v: 3}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}, + {StartTime: model.Time(minT + 6), EndTime: model.Time(minT + 9), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+9)), + }, + "multiple tombstones exist for the different series and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+10, 3), + + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+1, 1), + mockSeriesResponse(labels.Labels{series2Label}, minT+7, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+9, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+10, 3), + + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + {t: minT + 7, v: 2}, + {t: minT + 9, v: 2}, + }, + }, + { + lbls: labels.New(series2Label), + values: []valueResult{ + {t: minT + 1, v: 1}, + {t: minT + 10, v: 3}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}, + {StartTime: model.Time(minT + 6), EndTime: model.Time(minT + 9), Matchers: [][]*labels.Matcher{{seriesMatcher2}}}}, + model.Time(minT), model.Time(minT+9)), + }, } for testName, testData := range tests { @@ -593,6 +764,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(testData.tombstoneSet, error(nil)) q := &blocksStoreQuerier{ ctx: ctx, @@ -1071,6 +1243,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) q := &blocksStoreQuerier{ ctx: ctx, @@ -1167,6 +1340,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) t.Run(testName, func(t *testing.T) { finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) q := &blocksStoreQuerier{ ctx: context.Background(), @@ -1233,6 +1407,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { {ID: block1}, {ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) // Mock the store to simulate each block is queried from a different store-gateway. gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ @@ -1358,6 +1533,11 @@ func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, m return args.Get(0).(bucketindex.Blocks), args.Get(1).(map[ulid.ULID]*bucketindex.BlockDeletionMark), args.Error(2) } +func (m *blocksFinderMock) GetTombstones(ctx context.Context, userID string, minT, maxT int64) (*purger.TombstonesSet, error) { + args := m.Called(ctx, userID, minT, maxT) + return args.Get(0).(*purger.TombstonesSet), args.Error(1) +} + type storeGatewayClientMock struct { remoteAddr string mockedSeriesResponses []*storepb.SeriesResponse diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 5c5f6cb5d4..f3b3052e3d 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -38,6 +38,9 @@ type Index struct { // List of block deletion marks. BlockDeletionMarks BlockDeletionMarks `json:"block_deletion_marks"` + // List of tombstones that are required for query time filtering of deleted series. + Tombstones SeriesDeletionTombstones `json:"series_deletion_tombstones"` + // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated // (written in the storage) the last time. UpdatedAt int64 `json:"updated_at"` @@ -258,3 +261,15 @@ func (s Blocks) String() string { return b.String() } + +// Holds all the tombstones that are currently required to be used for +// filtering queries in the queriers. +type SeriesDeletionTombstones []*cortex_tsdb.Tombstone + +func (s SeriesDeletionTombstones) GetRequestIDs() []string { + ids := make([]string, len(s)) + for i, t := range s { + ids[i] = t.RequestID + } + return ids +} diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index 022dad0f95..55d8e996d7 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -51,7 +51,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { testutil.MockStorageDeletionMark(t, bkt, userID, testutil.MockStorageBlock(t, bkt, userID, 30, 40)) // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) expectedIdx, _, err := u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, WriteIndex(ctx, bkt, userID, nil, expectedIdx)) @@ -88,7 +88,7 @@ func BenchmarkReadIndex(b *testing.B) { } // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, _, err := u.UpdateIndex(ctx, nil) require.NoError(b, err) require.NoError(b, WriteIndex(ctx, bkt, userID, nil, idx)) diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index ec69f10dae..a15767539f 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -17,6 +17,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -25,18 +26,36 @@ var ( ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + ErrTombstoneNotFound = errors.New("Tombstone file not found") + ErrTombstoneCorrupted = errors.New("Tombstone file corrupted") ) // Updater is responsible to generate an update in-memory bucket index. type Updater struct { - bkt objstore.InstrumentedBucket - logger log.Logger + bkt objstore.InstrumentedBucket + blocksDeletionDelay time.Duration + blocksCleanupInterval time.Duration + bktCfg cortex_tsdb.BucketStoreConfig + tManager *cortex_tsdb.TombstoneManager + logger log.Logger } -func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater { +func NewUpdater( + bkt objstore.Bucket, + userID string, + cfgProvider bucket.TenantConfigProvider, + deletionDelay time.Duration, + cleanupInterval time.Duration, + bktCfg cortex_tsdb.BucketStoreConfig, + logger log.Logger) *Updater { + return &Updater{ - bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), - logger: util_log.WithUserID(userID, logger), + bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), + blocksDeletionDelay: deletionDelay, + blocksCleanupInterval: cleanupInterval, + bktCfg: bktCfg, + tManager: cortex_tsdb.NewTombstoneManager(bkt, userID, cfgProvider, logger), + logger: util_log.WithUserID(userID, logger), } } @@ -45,11 +64,13 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, error) { var oldBlocks []*Block var oldBlockDeletionMarks []*BlockDeletionMark + var oldTombstones []*cortex_tsdb.Tombstone // Read the old index, if provided. if old != nil { oldBlocks = old.Blocks oldBlockDeletionMarks = old.BlockDeletionMarks + oldTombstones = old.Tombstones } blocks, partials, err := w.updateBlocks(ctx, oldBlocks) @@ -62,10 +83,16 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid return nil, nil, err } + tombstones, err := w.updateSeriesDeletionTombstones(ctx, oldTombstones) + if err != nil { + return nil, nil, err + } + return &Index{ Version: IndexVersion1, Blocks: blocks, BlockDeletionMarks: blockDeletionMarks, + Tombstones: tombstones, UpdatedAt: time.Now().Unix(), }, partials, nil } @@ -223,3 +250,39 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid return BlockDeletionMarkFromThanosMarker(&m), nil } + +func (w *Updater) updateSeriesDeletionTombstones(ctx context.Context, oldTombstones []*cortex_tsdb.Tombstone) ([]*cortex_tsdb.Tombstone, error) { + out := make([]*cortex_tsdb.Tombstone, 0, len(oldTombstones)) + tombstones, err := w.tManager.GetAllDeleteRequestsForUser(ctx, oldTombstones) + if err != nil { + return nil, err + } + + for _, t := range tombstones { + if w.isTombstoneForFiltering(t) { + out = append(out, t) + } + } + + return out, nil + +} + +// TODO move this function to the tombstones.go file +func (w *Updater) isTombstoneForFiltering(t *cortex_tsdb.Tombstone) bool { + if t.State == cortex_tsdb.StatePending { + return true + } + + // Once the tombstone has been processed for permanent deletions, the data has been rewritten and + // the old block have been marked for deletion. + // The tombstones need to be used for query time filtering until we can guarantee that the queriers + // have picked up the new blocks and no longer will query any of the deleted blocks. + // This time should be enough to guarantee that the new blocks will be queried: + filterTimeAfterProcessed := w.bktCfg.SyncInterval + w.blocksDeletionDelay + w.blocksCleanupInterval + if t.State == cortex_tsdb.StateProcessed && time.Since(t.GetStateTime()) < filterTimeAfterProcessed { + return true + } + + return false +} diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 93b0135821..c7ac7603f4 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -21,6 +22,11 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) +const ( + blockDeletionDelay = time.Minute * 5 + blocksCleanupInterval = time.Minute * 10 +) + func TestUpdater_UpdateIndex(t *testing.T) { const userID = "user-1" @@ -34,33 +40,40 @@ func TestUpdater_UpdateIndex(t *testing.T) { block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) returnedIdx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2}) // Create new blocks, and update the index. block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50) block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4) + tombstone3 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request3", cortex_tsdb.StatePending) returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2, block3, block4}, - []*metadata.DeletionMark{block2Mark, block4Mark}) + []*metadata.DeletionMark{block2Mark, block4Mark}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone3}) - // Hard delete a block and update the index. + // Hard delete a block and tombstone and update the index. require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID)) + require.NoError(t, w.tManager.DeleteTombstoneFile(ctx, tombstone1.RequestID, tombstone1.State)) returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block3, block4}, - []*metadata.DeletionMark{block4Mark}) + []*metadata.DeletionMark{block4Mark}, + []*cortex_tsdb.Tombstone{tombstone2, tombstone3}) } func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { @@ -77,16 +90,18 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) // Delete a block's meta.json to simulate a partial block. require.NoError(t, bkt.Delete(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1}) assert.Len(t, partials, 1) assert.True(t, errors.Is(partials[block3.ULID], ErrBlockMetaNotFound)) @@ -106,16 +121,18 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) // Overwrite a block's meta.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1}) assert.Len(t, partials, 1) assert.True(t, errors.Is(partials[block3.ULID], ErrBlockMetaCorrupted)) @@ -139,12 +156,136 @@ func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { // Overwrite a block's deletion-mark.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block2Mark.ID.String(), metadata.DeletionMarkFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2, block3}, - []*metadata.DeletionMark{}) + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldSkipCorruptedTombstones(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + + // Overwrite a tombstone with invalid data. + require.NoError(t, bkt.Upload(ctx, path.Join(userID, cortex_tsdb.TombstonePath, tombstone1.GetFilename()), bytes.NewReader([]byte("invalid!}")))) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone2}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldNotUploadProcessedTombstonesPassedFilteringPeriod(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // The bucket index stores all the tombstones that need to be used for query filtering + // All pending state tombstones should be uploaded to the index + // Processed state tombstones should be uploaded if the following time period hasn't passed since they updated to processed + // -compactor.deletion-delay + -compactor.cleanup-interval + -blocks-storage.bucket-store.sync-interval + + bktStoreCfg := cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + } + + blockDeletionDelay := time.Minute * 2 + blocksCleanupInterval := time.Minute * 3 + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + // request2 should be uploaded because it is in pending state + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + // request3 should not be uploaded to the idx since the time period for filtering has passed + testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request3", cortex_tsdb.StateProcessed) + tombstone4 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*5).Unix()*1000, 0, 0, []string{"series"}, "request4", cortex_tsdb.StateProcessed) + // request5 should not be uploaded since cancelled tombstones are not required for filtering + testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request5", cortex_tsdb.StateCancelled) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, bktStoreCfg, logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone4}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldNotUploadDuplicateTombstones(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // There could be a scenario where a tombstone is updated to a new state + // but the file with the old state is not deleted. The bucket index should only + // keep the file with the most up to date state + + // In order the states are: pending, processed, cancelled + // cancelled state can only happen during pending but should take priority over all other states + + bktStoreCfg := cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + } + + blockDeletionDelay := time.Minute * 2 + blocksCleanupInterval := time.Minute * 3 + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + tombstone3 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request3", cortex_tsdb.StatePending) + tombstone4 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request4", cortex_tsdb.StatePending) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, bktStoreCfg, logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone3, tombstone4}) + assert.Empty(t, partials) + + tombstone1 = testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*5).Unix()*1000, 0, 0, []string{"series"}, "request1", cortex_tsdb.StateProcessed) + // request3 should no longer be included in the bucket index since the filtering time for processed states is over + testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request3", cortex_tsdb.StateProcessed) + // request4 should not be uploaded since cancelled tombstones are not included in bucket index + testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request4", cortex_tsdb.StateCancelled) + + idx, partials, err = w.UpdateIndex(ctx, idx) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2}) assert.Empty(t, partials) } @@ -155,7 +296,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { bkt, _ := testutil.PrepareFilesystemBucket(t) for _, oldIdx := range []*Index{nil, {}} { - w := NewUpdater(bkt, userID, nil, log.NewNopLogger()) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), log.NewNopLogger()) idx, partials, err := w.UpdateIndex(ctx, oldIdx) require.NoError(t, err) @@ -176,7 +317,7 @@ func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockI return attrs.LastModified.Unix() } -func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark) { +func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, expectedTombstones SeriesDeletionTombstones) { assert.Equal(t, IndexVersion1, idx.Version) assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2) @@ -203,4 +344,14 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI } assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks) + assert.ElementsMatch(t, expectedTombstones, idx.Tombstones) +} + +func newMockBucketStoreCfg() cortex_tsdb.BucketStoreConfig { + return cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + BucketIndex: cortex_tsdb.BucketIndexConfig{ + MaxStalePeriod: time.Hour, + }, + } } diff --git a/pkg/storage/tsdb/testutil/block_mock.go b/pkg/storage/tsdb/testutil/block_mock.go index a2a931aa59..2fa2903225 100644 --- a/pkg/storage/tsdb/testutil/block_mock.go +++ b/pkg/storage/tsdb/testutil/block_mock.go @@ -1,14 +1,17 @@ package testutil import ( + "bytes" "context" "crypto/rand" "encoding/json" "fmt" + "path" "strings" "testing" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/require" @@ -66,3 +69,31 @@ func MockStorageDeletionMark(t testing.TB, bucket objstore.Bucket, userID string return &mark } + +func MockTombstone( + t testing.TB, + bucket objstore.Bucket, + userID string, + requestTime, + stateTime, + startTime, + endTime int64, + selectors []string, + requestID string, + state cortex_tsdb.BlockDeleteRequestState) *cortex_tsdb.Tombstone { + + ts := cortex_tsdb.NewTombstone(userID, requestTime, stateTime, startTime, endTime, selectors, requestID, state) + + tombstoneFilename := ts.GetFilename() + path := path.Join(userID, cortex_tsdb.TombstonePath, tombstoneFilename) + data, err := json.Marshal(ts) + + require.NoError(t, err) + require.NoError(t, bucket.Upload(context.Background(), path, bytes.NewReader(data))) + + ts.Matchers, err = cortex_tsdb.ParseMatchers(ts.Selectors) + require.NoError(t, err) + + return ts + +} diff --git a/pkg/storage/tsdb/tombstones.go b/pkg/storage/tsdb/tombstones.go index fa6c0bb1ab..5bdd3c441b 100644 --- a/pkg/storage/tsdb/tombstones.go +++ b/pkg/storage/tsdb/tombstones.go @@ -143,27 +143,37 @@ func (m *TombstoneManager) GetDeleteRequestByIDForUser(ctx context.Context, requ } -func (m *TombstoneManager) GetAllDeleteRequestsForUser(ctx context.Context) ([]*Tombstone, error) { +func (m *TombstoneManager) GetAllDeleteRequestsForUser(ctx context.Context, prevLoadedTombstones []*Tombstone) ([]*Tombstone, error) { // add all the tombstones to a map and check for duplicates, // if a key exists with the same request ID (but two different states) - tombstoneMap := make(map[string]*Tombstone) + out := make([]*Tombstone, 0, len(prevLoadedTombstones)) + discovered := make(map[string]BlockDeleteRequestState) + err := m.bkt.Iter(ctx, TombstonePath, func(s string) error { - t, err := m.ReadTombstoneFile(ctx, s) + tombstoneName := filepath.Base(s) + requestID, state, err := GetTombstoneStateAndRequestIDFromPath(tombstoneName) if err != nil { return err } - if _, exists := tombstoneMap[t.RequestID]; !exists { - tombstoneMap[t.RequestID] = t + if prevState, exists := discovered[requestID]; !exists { + discovered[requestID] = state } else { // if there is more than one tombstone for a given request, - // we only want to return the latest state. The older file - // will be cleaned by the compactor - newT, err := m.getLatestTombstateByState(t, tombstoneMap[t.RequestID]) + // we only want to keep track of the one with the latest state + orderA, err := state.GetStateOrder() if err != nil { return err } - tombstoneMap[t.RequestID] = newT + orderB, err := prevState.GetStateOrder() + if err != nil { + return err + } + + // If the new state found is the lastest, then we replace the tombstone state in the map + if orderA > orderB { + discovered[requestID] = state + } } return nil }) @@ -172,12 +182,38 @@ func (m *TombstoneManager) GetAllDeleteRequestsForUser(ctx context.Context) ([]* return nil, err } - deletionRequests := []*Tombstone{} - for _, t := range tombstoneMap { - deletionRequests = append(deletionRequests, t) + // Since tombstones are immutable, all tombstones already existing in the index can just be copied. + for _, t := range prevLoadedTombstones { + if state, ok := discovered[t.RequestID]; ok && state == t.State { + out = append(out, t) + delete(discovered, t.RequestID) + } } - return deletionRequests, nil + // Remaining tombstones are new ones and we have to fetch them. + for id, state := range discovered { + filename := getTombstoneFileName(id, state) + t, err := m.ReadTombstoneFile(ctx, path.Join(TombstonePath, filename)) + if errors.Is(err, ErrTombstoneNotFound) { + // This could happen if the series deletion cleaner removes the tombstone or the user cancels it between + // the "list objects" and now + level.Warn(m.logger).Log("msg", "skipped missing tombstone file when loading all the tombstones", "requestID", id, "state", string(state)) + continue + } + if errors.Is(err, ErrTombstoneDecode) { + level.Error(m.logger).Log("msg", "skipped corrupted tombstone file when loading all the tombstones", "requestID", id, "state", state, "err", err) + continue + } + if err != nil { + return nil, err + } + + out = append(out, t) + + } + + return out, nil + } func (m *TombstoneManager) getLatestTombstateByState(a *Tombstone, b *Tombstone) (*Tombstone, error) { @@ -330,6 +366,10 @@ func (t *Tombstone) GetCreateTime() time.Time { return time.Unix(t.RequestCreatedAt/1000, 0) } +func (t *Tombstone) GetStateTime() time.Time { + return time.Unix(t.StateCreatedAt/1000, 0) +} + func getTombstoneFileName(requestID string, state BlockDeleteRequestState) string { return requestID + "." + string(state) + ".json" } diff --git a/pkg/storage/tsdb/tombstones_test.go b/pkg/storage/tsdb/tombstones_test.go index 3238f19bd1..0614522311 100644 --- a/pkg/storage/tsdb/tombstones_test.go +++ b/pkg/storage/tsdb/tombstones_test.go @@ -222,7 +222,7 @@ func TestGetAllTombstones(t *testing.T) { require.NoError(t, tManager.WriteTombstoneFile(ctx, ts)) } - tombstonesOutput, err := tManager.GetAllDeleteRequestsForUser(ctx) + tombstonesOutput, err := tManager.GetAllDeleteRequestsForUser(ctx, nil) require.NoError(t, err) outputMap := make(map[string]BlockDeleteRequestState) @@ -267,4 +267,10 @@ func TestTombstoneReadWithInvalidFileName(t *testing.T) { require.ErrorIs(t, err, ErrInvalidDeletionRequestState) } + { + tNotExists := username + "/tombstones/" + requestID + "." + string(StatePending) + ".json" + _, err := tManager.ReadTombstoneFile(ctx, tNotExists) + require.ErrorIs(t, err, ErrTombstoneNotFound) + } + } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 84fb469ea9..79aa0b7458 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -1112,7 +1112,7 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, } func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { - updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()) + updater := bucketindex.NewUpdater(bkt, userID, nil, time.Minute, time.Minute, cortex_tsdb.BucketStoreConfig{}, log.NewNopLogger()) idx, _, err := updater.UpdateIndex(context.Background(), nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 84e6ec0cfe..43acab37cc 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) @@ -68,7 +69,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { if bucketIndexEnabled { var err error - u := bucketindex.NewUpdater(bkt, userID, nil, logger) + u := bucketindex.NewUpdater(bkt, userID, nil, time.Minute, time.Minute, cortex_tsdb.BucketStoreConfig{}, logger) idx, _, err = u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, idx))