From 39fe212c86097dcdc822389ae5b8756e69470a7a Mon Sep 17 00:00:00 2001 From: ilangofman Date: Thu, 29 Jul 2021 13:15:57 -0400 Subject: [PATCH 1/5] Add query time filtering Signed-off-by: ilangofman --- pkg/chunk/purger/tombstones.go | 8 + pkg/compactor/blocks_cleaner.go | 6 +- pkg/compactor/blocks_cleaner_test.go | 67 +++++-- pkg/compactor/compactor.go | 2 +- pkg/compactor/compactor_test.go | 18 ++ pkg/querier/blocks_finder_bucket_index.go | 73 ++++++++ .../blocks_finder_bucket_index_test.go | 118 +++++++++++- pkg/querier/blocks_finder_bucket_scan.go | 6 + pkg/querier/blocks_store_queryable.go | 25 ++- pkg/storage/tsdb/bucketindex/index.go | 15 ++ pkg/storage/tsdb/bucketindex/storage_test.go | 4 +- pkg/storage/tsdb/bucketindex/updater.go | 150 ++++++++++++++- pkg/storage/tsdb/bucketindex/updater_test.go | 177 ++++++++++++++++-- pkg/storage/tsdb/testutil/block_mock.go | 31 +++ pkg/storage/tsdb/tombstones.go | 86 +++++---- pkg/storage/tsdb/tombstones_test.go | 12 +- pkg/storegateway/gateway_test.go | 2 +- .../metadata_fetcher_filters_test.go | 3 +- 18 files changed, 714 insertions(+), 89 deletions(-) diff --git a/pkg/chunk/purger/tombstones.go b/pkg/chunk/purger/tombstones.go index fdf2cc0914..e8722b27c4 100644 --- a/pkg/chunk/purger/tombstones.go +++ b/pkg/chunk/purger/tombstones.go @@ -48,6 +48,14 @@ type TombstonesSet struct { oldestTombstoneStart, newestTombstoneEnd model.Time // Used as optimization to find whether we want to iterate over tombstones or not } +func NewTombstoneSet(t []DeleteRequest, start model.Time, end model.Time) *TombstonesSet { + return &TombstonesSet{ + tombstones: t, + oldestTombstoneStart: start, + newestTombstoneEnd: end, + } +} + // Used for easier injection of mocks. type DeleteStoreAPI interface { getCacheGenerationNumbers(ctx context.Context, user string) (*cacheGenNumbers, error) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 0ebd79aa83..cab614eec2 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -40,6 +40,7 @@ type BlocksCleaner struct { logger log.Logger bucketClient objstore.Bucket usersScanner *cortex_tsdb.UsersScanner + bucketCfg cortex_tsdb.BucketStoreConfig // Keep track of the last owned users. lastOwnedUsers []string @@ -58,10 +59,11 @@ type BlocksCleaner struct { tenantBucketIndexLastUpdate *prometheus.GaugeVec } -func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { +func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, bktCfg cortex_tsdb.BucketStoreConfig, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner { c := &BlocksCleaner{ cfg: cfg, bucketClient: bucketClient, + bucketCfg: bktCfg, usersScanner: usersScanner, cfgProvider: cfgProvider, logger: log.With(logger, "component", "cleaner"), @@ -329,7 +331,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } // Generate an updated in-memory version of the bucket index. - w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) + w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.cfg.DeletionDelay, c.cfg.CleanupInterval, c.bucketCfg, c.logger) idx, partials, err := w.UpdateIndex(ctx, idx) if err != nil { return err diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 7576c6a6d8..d5d84aa30b 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -22,6 +22,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/tsdb" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util" @@ -98,6 +99,16 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json") require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here"))) + // create sample tombstones + tombstone1 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + tombstone2 := cortex_tsdb.NewTombstone("user-1", 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending) + tombstone3 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series1"}, "request3", cortex_tsdb.StatePending) + tombstone4 := cortex_tsdb.NewTombstone("user-2", 100, 100, 0, 15, []string{"series2"}, "request4", cortex_tsdb.StateCancelled) + uploadTombstone(t, bucketClient, "user-1", tombstone1) + uploadTombstone(t, bucketClient, "user-1", tombstone2) + uploadTombstone(t, bucketClient, "user-2", tombstone3) + uploadTombstone(t, bucketClient, "user-2", tombstone4) + // The fixtures have been created. If the bucket client wasn't wrapped to write // deletion marks to the global location too, then this is the right time to do it. if options.markersMigrationEnabled { @@ -117,7 +128,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -166,21 +177,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions // Check the updated bucket index. for _, tc := range []struct { - userID string - expectedIndex bool - expectedBlocks []ulid.ULID - expectedMarks []ulid.ULID + userID string + expectedIndex bool + expectedBlocks []ulid.ULID + expectedMarks []ulid.ULID + expectedTombstonesIDs []string }{ { - userID: "user-1", - expectedIndex: true, - expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */}, - expectedMarks: []ulid.ULID{block2}, + userID: "user-1", + expectedIndex: true, + expectedBlocks: []ulid.ULID{block1, block2 /* deleted: block3, block4, block5, partial: block6 */}, + expectedMarks: []ulid.ULID{block2}, + expectedTombstonesIDs: []string{"request1", "request2"}, }, { - userID: "user-2", - expectedIndex: true, - expectedBlocks: []ulid.ULID{block8}, - expectedMarks: []ulid.ULID{}, + userID: "user-2", + expectedIndex: true, + expectedBlocks: []ulid.ULID{block8}, + expectedMarks: []ulid.ULID{}, + expectedTombstonesIDs: []string{"request3"}, + // request4 should not be included because it is + // cancelled and should not be used for query filtering }, { userID: "user-3", expectedIndex: false, @@ -195,6 +211,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions require.NoError(t, err) assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs()) + assert.ElementsMatch(t, tc.expectedTombstonesIDs, idx.Tombstones.GetRequestIDs()) } assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -251,7 +268,7 @@ func TestBlocksCleaner_ShouldContinueOnBlockDeletionFailure(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -297,6 +314,10 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { block3 := createTSDBBlock(t, bucketClient, userID, 30, 40, nil) createDeletionMark(t, bucketClient, userID, block2, now.Add(-deletionDelay).Add(-time.Hour)) createDeletionMark(t, bucketClient, userID, block3, now.Add(-deletionDelay).Add(time.Hour)) + tombstone1 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + tombstone2 := cortex_tsdb.NewTombstone(userID, 100, 100, 0, 15, []string{"series2"}, "request2", cortex_tsdb.StatePending) + uploadTombstone(t, bucketClient, userID, tombstone1) + uploadTombstone(t, bucketClient, userID, tombstone2) // Write a corrupted bucket index. require.NoError(t, bucketClient.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid!}"))) @@ -311,7 +332,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, cleaner)) defer services.StopAndAwaitTerminated(ctx, cleaner) //nolint:errcheck @@ -339,6 +360,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { require.NoError(t, err) assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs()) + assert.ElementsMatch(t, []string{"request1", "request2"}, idx.Tombstones.GetRequestIDs()) } func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) { @@ -362,7 +384,7 @@ func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShar scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) require.NoError(t, cleaner.cleanUsers(ctx, true)) assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -420,7 +442,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) { id2 := createTSDBBlock(t, bucketClient, "user-1", 6000, 7000, nil) id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil) - w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger) + w := bucketindex.NewUpdater(bucketClient, "user-1", nil, 0, 0, newMockBucketStoreCfg(), logger) idx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) @@ -493,7 +515,7 @@ func TestBlocksCleaner_ShouldRemoveBlocksOutsideRetentionPeriod(t *testing.T) { scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, reg) + cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, newMockBucketStoreCfg(), logger, reg) assertBlockExists := func(user string, block ulid.ULID, expectExists bool) { exists, err := bucketClient.Exists(ctx, path.Join(user, block.String(), metadata.MetaFilename)) @@ -681,3 +703,12 @@ func (m *mockConfigProvider) S3SSEKMSKeyID(userID string) string { func (m *mockConfigProvider) S3SSEKMSEncryptionContext(userID string) string { return "" } + +func newMockBucketStoreCfg() cortex_tsdb.BucketStoreConfig { + return cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + BucketIndex: cortex_tsdb.BucketIndexConfig{ + MaxStalePeriod: time.Hour, + }, + } +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 51952688cb..37f88ea8e5 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -360,7 +360,7 @@ func (c *Compactor) starting(ctx context.Context) error { CleanupConcurrency: c.compactorCfg.CleanupConcurrency, BlockDeletionMarksMigrationEnabled: c.compactorCfg.BlockDeletionMarksMigrationEnabled, TenantCleanupDelay: c.compactorCfg.TenantCleanupDelay, - }, c.bucketClient, c.usersScanner, c.cfgProvider, c.parentLogger, c.registerer) + }, c.bucketClient, c.usersScanner, c.cfgProvider, c.storageCfg.BucketStore, c.parentLogger, c.registerer) // Initialize the compactors ring if sharding is enabled. if c.compactorCfg.ShardingEnabled { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 9c14cb0924..fbc0fed1ed 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -422,6 +422,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockIter("", []string{userID}, nil) bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockIter(userID+"/tombstones/", nil, nil) bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) @@ -476,6 +477,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) + bucketClient.MockIter("user-2/tombstones/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) @@ -605,6 +608,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) @@ -713,6 +717,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil) bucketClient.MockDelete("user-1/bucket-index.json.gz", nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient) @@ -808,6 +813,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockIter("user-2/", []string{"user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ"}, nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) + bucketClient.MockIter("user-1/tombstones/", nil, nil) + bucketClient.MockIter("user-2/tombstones/", nil, nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) bucketClient.MockGet("user-2/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil) @@ -886,6 +893,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM for _, userID := range userIDs { bucketClient.MockIter(userID+"/", []string{userID + "/01DTVP434PA9VFXSW2JKB3392D"}, nil) bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockIter(userID+"/tombstones/", nil, nil) bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil) bucketClient.MockGet(userID+"/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil) @@ -1031,6 +1039,16 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content))) } +func uploadTombstone(t *testing.T, bkt objstore.Bucket, userID string, tombstone *cortex_tsdb.Tombstone) { + tombstoneFilename := tombstone.GetFilename() + path := path.Join(userID, cortex_tsdb.TombstonePath, tombstoneFilename) + data, err := json.Marshal(tombstone) + + require.NoError(t, err) + require.NoError(t, bkt.Upload(context.Background(), path, bytes.NewReader(data))) + +} + func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { var compactor *Compactor var log *concurrency.SyncBuffer diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 368ea60420..baa18d0e10 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -2,14 +2,19 @@ package querier import ( "context" + "math" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" @@ -46,6 +51,7 @@ func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore. } // GetBlocks implements BlocksFinder. +// ILAN IMPORTANT func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { if f.State() != services.Running { return nil, nil, errBucketIndexBlocksFinderNotRunning @@ -107,3 +113,70 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return blocks, matchingDeletionMarks, nil } + +func (f *BucketIndexBlocksFinder) GetTombstones(ctx context.Context, userID string, minT int64, maxT int64) (*purger.TombstonesSet, error) { + if f.State() != services.Running { + return nil, errBucketIndexBlocksFinderNotRunning + } + if maxT < minT { + return nil, errInvalidBlocksRange + } + + // Get the bucket index for this user. + idx, err := f.loader.GetIndex(ctx, userID) + if errors.Is(err, bucketindex.ErrIndexNotFound) { + // This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet + // so the bucket index hasn't been created yet. + return nil, nil + } + if err != nil { + return nil, err + } + + // Ensure the bucket index is not too old. + if time.Since(idx.GetUpdatedAt()) > f.cfg.MaxStalePeriod { + return nil, errBucketIndexTooOld + } + + tConverted := []purger.DeleteRequest{} + var tMinTime int64 = math.MaxInt64 + var tMaxTime int64 = math.MinInt64 + for _, t := range idx.Tombstones { + if !t.IsOverlappingInterval(minT, maxT) { + continue + } + + // Convert the tombstone into a deletion request which was implemented for chunk store deletion + // This will allow many of the query filtering code to be shared among block/chunk store + matchers, err := cortex_tsdb.ParseMatchers(t.Selectors) + if err != nil { + return nil, errors.Wrapf(err, "failed to parse tombstone selectors for: %s", t.RequestID) + } + + request := purger.DeleteRequest{ + StartTime: model.Time(t.StartTime), + EndTime: model.Time(t.EndTime), + Matchers: [][]*labels.Matcher{matchers}, + } + tConverted = append(tConverted, request) + + if t.StartTime < tMinTime { + tMinTime = t.StartTime + } + if t.EndTime > tMaxTime { + tMaxTime = t.EndTime + } + } + + // Reduce the interval that tombstone will be applied if possible + if minT > tMinTime { + tMinTime = minT + } + if maxT < tMaxTime { + tMaxTime = maxT + } + tombstoneSet := purger.NewTombstoneSet(tConverted, model.Time(tMinTime), model.Time(tMaxTime)) + + return tombstoneSet, nil + +} diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 151ff2a3be..5a4ed76178 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -2,6 +2,7 @@ package querier import ( "context" + "math" "path" "strings" "testing" @@ -9,10 +10,14 @@ import ( "github.com/go-kit/kit/log" "github.com/oklog/ulid" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" @@ -153,7 +158,104 @@ func BenchmarkBucketIndexBlocksFinder_GetBlocks(b *testing.B) { } } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) { +func TestBucketIndexBlocksFinder_GetTombstones(t *testing.T) { + const userID = "user-1" + + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + + // Mock a bucket index. + t1 := cortex_tsdb.NewTombstone(userID, 0, 0, 10, 15, []string{"series1"}, "request1", cortex_tsdb.StatePending) + t1.Matchers, _ = cortex_tsdb.ParseMatchers(t1.Selectors) + t2 := cortex_tsdb.NewTombstone(userID, 0, 0, 12, 20, []string{"series2"}, "request2", cortex_tsdb.StatePending) + t2.Matchers, _ = cortex_tsdb.ParseMatchers(t2.Selectors) + t3 := cortex_tsdb.NewTombstone(userID, 0, 0, 20, 30, []string{"series3"}, "request3", cortex_tsdb.StatePending) + t3.Matchers, _ = cortex_tsdb.ParseMatchers(t3.Selectors) + t4 := cortex_tsdb.NewTombstone(userID, 0, 0, 30, 40, []string{"series4"}, "request4", cortex_tsdb.StatePending) + t4.Matchers, _ = cortex_tsdb.ParseMatchers(t4.Selectors) + + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + Tombstones: bucketindex.SeriesDeletionTombstones{t1, t2, t3, t4}, + UpdatedAt: time.Now().Unix(), + })) + + finder := prepareBucketIndexBlocksFinder(t, bkt) + + tests := map[string]struct { + minT int64 + maxT int64 + expectedTombstoneSet *purger.TombstonesSet + }{ + "no matching tombstones because the range is too low": { + minT: 0, + maxT: 5, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{}, math.MaxInt64, math.MinInt64), + }, + "no matching tombstones because the range is too high": { + minT: 50, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{}, math.MaxInt64, math.MinInt64), + }, + "matching all tombstones": { + minT: 0, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t1.StartTime), EndTime: model.Time(t1.EndTime), Matchers: [][]*labels.Matcher{t1.Matchers}}, + {StartTime: model.Time(t2.StartTime), EndTime: model.Time(t2.EndTime), Matchers: [][]*labels.Matcher{t2.Matchers}}, + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t1.StartTime), model.Time(t4.EndTime)), + }, + "query range starting at a tombstone end time": { + minT: t3.EndTime, + maxT: 60, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t4.StartTime), model.Time(t4.EndTime)), + }, + "query range ending at a tombstone start time": { + minT: t3.StartTime, + maxT: t4.EndTime, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + {StartTime: model.Time(t4.StartTime), EndTime: model.Time(t4.EndTime), Matchers: [][]*labels.Matcher{t4.Matchers}}, + }, model.Time(t3.StartTime), model.Time(t4.EndTime)), + }, + "query range within a single tombstone": { + minT: t3.StartTime + 2, + maxT: t3.EndTime - 2, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + }, model.Time(t3.StartTime+2), model.Time(t3.EndTime-2)), + }, + "query range within multiple tombstones": { + minT: 13, + maxT: 16, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t1.StartTime), EndTime: model.Time(t1.EndTime), Matchers: [][]*labels.Matcher{t1.Matchers}}, + {StartTime: model.Time(t2.StartTime), EndTime: model.Time(t2.EndTime), Matchers: [][]*labels.Matcher{t2.Matchers}}, + }, 13, 16), + }, + "query range matching exactly a single block": { + minT: t3.StartTime, + maxT: t3.EndTime - 1, + expectedTombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(t3.StartTime), EndTime: model.Time(t3.EndTime), Matchers: [][]*labels.Matcher{t3.Matchers}}, + }, model.Time(t3.StartTime), model.Time(t3.EndTime-1)), + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + tombstoneSet, err := finder.GetTombstones(ctx, userID, testData.minT, testData.maxT) + require.NoError(t, err) + require.Equal(t, testData.expectedTombstoneSet, tombstoneSet) + }) + } +} + +func TestBucketIndexBlocksFinder_BucketIndexDoesNotExist(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -164,9 +266,13 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexDoesNotExist(t *testing.T) require.NoError(t, err) assert.Empty(t, blocks) assert.Empty(t, deletionMarks) + + tombstoneSet, err := finder.GetTombstones(ctx, userID, 10, 20) + require.NoError(t, err) + assert.Empty(t, tombstoneSet) } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) { +func TestBucketIndexBlocksFinder_BucketIndexIsCorrupted(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -178,9 +284,12 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsCorrupted(t *testing.T) _, _, err := finder.GetBlocks(ctx, userID, 10, 20) require.Equal(t, bucketindex.ErrIndexCorrupted, err) + + _, err = finder.GetTombstones(ctx, userID, 10, 20) + require.Equal(t, bucketindex.ErrIndexCorrupted, err) } -func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { +func TestBucketIndexBlocksFinder_BucketIndexIsTooOld(t *testing.T) { const userID = "user-1" ctx := context.Background() @@ -196,6 +305,9 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { _, _, err := finder.GetBlocks(ctx, userID, 10, 20) require.Equal(t, errBucketIndexTooOld, err) + + _, err = finder.GetTombstones(ctx, userID, 10, 20) + require.Equal(t, errBucketIndexTooOld, err) } func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder { diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index bfa2086313..0a6efa9289 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" @@ -150,6 +151,11 @@ func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, min return matchingMetas, matchingDeletionMarks, nil } +func (d *BucketScanBlocksFinder) GetTombstones(_ context.Context, _ string, _ int64, _ int64) (*purger.TombstonesSet, error) { + level.Warn(util_log.Logger).Log("msg", "Series deletion query time Filtering is not supported without bucket index enabled") + return nil, nil +} + func (d *BucketScanBlocksFinder) starting(ctx context.Context) error { // Before the service is in the running state it must have successfully // complete the initial scan. diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 6ed8030105..14c8524463 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/thanos-io/thanos/pkg/block" @@ -27,6 +28,7 @@ import ( "golang.org/x/sync/errgroup" grpc_metadata "google.golang.org/grpc/metadata" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/series" "github.com/cortexproject/cortex/pkg/ring" @@ -75,6 +77,9 @@ type BlocksFinder interface { // GetBlocks returns known blocks for userID containing samples within the range minT // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) + + // // GetTombstones returns all the tombstones that are currently required for filtering deleted series + GetTombstones(ctx context.Context, userID string, minT, maxT int64) (*purger.TombstonesSet, error) } // BlocksStoreClient is the interface that should be implemented by any client used @@ -405,8 +410,8 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID) leftChunksLimit = maxChunksLimit - - resultMtx sync.Mutex + tombstoneSet *purger.TombstonesSet + resultMtx sync.Mutex ) queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) { @@ -437,11 +442,21 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* if len(resSeriesSets) == 0 { storage.EmptySeriesSet() + } else { + tombstoneSet, err = q.finder.GetTombstones(spanCtx, q.userID, minT, maxT) + if err != nil { + return storage.ErrSeriesSet(err) + } } - return series.NewSeriesSetWithWarnings( - storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge), - resWarnings) + seriesSet := storage.NewMergeSeriesSet(resSeriesSets, storage.ChainedSeriesMerge) + + if tombstoneSet != nil && tombstoneSet.Len() != 0 { + seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstoneSet, model.Interval{Start: model.Time(minT), End: model.Time(maxT)}) + } + + return series.NewSeriesSetWithWarnings(seriesSet, resWarnings) + } func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logger log.Logger, minT, maxT int64, diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index 5c5f6cb5d4..f6e0117feb 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -38,6 +38,9 @@ type Index struct { // List of block deletion marks. BlockDeletionMarks BlockDeletionMarks `json:"block_deletion_marks"` + // List of tombstones that require query time filtering for deleted series + Tombstones SeriesDeletionTombstones `json:"series_deletion_tombstones"` + // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated // (written in the storage) the last time. UpdatedAt int64 `json:"updated_at"` @@ -258,3 +261,15 @@ func (s Blocks) String() string { return b.String() } + +// Holds all the tombstones that are currently required to be used for +// filtering queries in the queriers. +type SeriesDeletionTombstones []*cortex_tsdb.Tombstone + +func (s SeriesDeletionTombstones) GetRequestIDs() []string { + ids := make([]string, len(s)) + for i, t := range s { + ids[i] = t.RequestID + } + return ids +} diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index 022dad0f95..55d8e996d7 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -51,7 +51,7 @@ func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { testutil.MockStorageDeletionMark(t, bkt, userID, testutil.MockStorageBlock(t, bkt, userID, 30, 40)) // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) expectedIdx, _, err := u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, WriteIndex(ctx, bkt, userID, nil, expectedIdx)) @@ -88,7 +88,7 @@ func BenchmarkReadIndex(b *testing.B) { } // Write the index. - u := NewUpdater(bkt, userID, nil, logger) + u := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, _, err := u.UpdateIndex(ctx, nil) require.NoError(b, err) require.NoError(b, WriteIndex(ctx, bkt, userID, nil, idx)) diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index ec69f10dae..d71ed74e42 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -5,6 +5,7 @@ import ( "encoding/json" "io/ioutil" "path" + "path/filepath" "time" "github.com/go-kit/kit/log" @@ -17,6 +18,7 @@ import ( "github.com/thanos-io/thanos/pkg/runutil" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -25,18 +27,34 @@ var ( ErrBlockMetaCorrupted = block.ErrorSyncMetaCorrupted ErrBlockDeletionMarkNotFound = errors.New("block deletion mark not found") ErrBlockDeletionMarkCorrupted = errors.New("block deletion mark corrupted") + ErrTombstoneNotFound = errors.New("Tombstone file not found") + ErrTombstoneCorrupted = errors.New("Tombstone file corrupted") ) // Updater is responsible to generate an update in-memory bucket index. type Updater struct { - bkt objstore.InstrumentedBucket - logger log.Logger + bkt objstore.InstrumentedBucket + blocksDeletionDelay time.Duration + blocksCleanupInterval time.Duration + bktCfg cortex_tsdb.BucketStoreConfig + logger log.Logger } -func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater { +func NewUpdater( + bkt objstore.Bucket, + userID string, + cfgProvider bucket.TenantConfigProvider, + deletionDelay time.Duration, + cleanupInterval time.Duration, + bktCfg cortex_tsdb.BucketStoreConfig, + logger log.Logger) *Updater { + return &Updater{ - bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), - logger: util_log.WithUserID(userID, logger), + bkt: bucket.NewUserBucketClient(userID, bkt, cfgProvider), + blocksDeletionDelay: deletionDelay, + blocksCleanupInterval: cleanupInterval, + bktCfg: bktCfg, + logger: util_log.WithUserID(userID, logger), } } @@ -45,11 +63,13 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, error) { var oldBlocks []*Block var oldBlockDeletionMarks []*BlockDeletionMark + var oldTombstones []*cortex_tsdb.Tombstone // Read the old index, if provided. if old != nil { oldBlocks = old.Blocks oldBlockDeletionMarks = old.BlockDeletionMarks + oldTombstones = old.Tombstones } blocks, partials, err := w.updateBlocks(ctx, oldBlocks) @@ -62,10 +82,16 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid return nil, nil, err } + tombstones, err := w.updateSeriesDeletionTombstones(ctx, oldTombstones) + if err != nil { + return nil, nil, err + } + return &Index{ Version: IndexVersion1, Blocks: blocks, BlockDeletionMarks: blockDeletionMarks, + Tombstones: tombstones, UpdatedAt: time.Now().Unix(), }, partials, nil } @@ -223,3 +249,117 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid return BlockDeletionMarkFromThanosMarker(&m), nil } + +// just get the tombstone filename and save that to a discovered list +// do all the caching stuff and remove the old ones from the discovered list. +// Then once we get the final list, remove duplicates +// TODO make it work with the old tombstones to help with caching +func (w *Updater) updateSeriesDeletionTombstones(ctx context.Context, oldTombstones []*cortex_tsdb.Tombstone) ([]*cortex_tsdb.Tombstone, error) { + out := make([]*cortex_tsdb.Tombstone, 0, len(oldTombstones)) + discovered := make(map[string]cortex_tsdb.BlockDeleteRequestState) + + err := w.bkt.Iter(ctx, "tombstones/", func(s string) error { + tName := filepath.Base(s) + requestID, state, err := cortex_tsdb.ParseTombstonePath(tName) + if err != nil { + return err + } + + if prevState, exists := discovered[requestID]; !exists { + discovered[requestID] = state + } else { + // if there is more than one tombstone for a given request, + // we only want to keep track of the one with the latest state + orderA, err := state.GetStateOrder() + if err != nil { + return err + } + orderB, err := prevState.GetStateOrder() + if err != nil { + return err + } + + // If the new state found is the lastest, then we replace the tombstone state in the map + if orderA > orderB { + discovered[requestID] = state + } + } + return nil + }) + + if err != nil { + return nil, err + } + + // Since tombstones are immutable, all tombstones already existing in the index can just be copied. + for _, t := range oldTombstones { + if state, ok := discovered[t.RequestID]; ok && state == t.State { + if w.isTombstoneForFiltering(t) { + out = append(out, t) + } + delete(discovered, t.RequestID) + } + } + + // Remaining tombstones are new ones and we have to fetch them. + for id, state := range discovered { + t, err := w.updateTombstoneIndexEntry(ctx, id, state) + if errors.Is(err, ErrTombstoneNotFound) { + // This could happen if the series deletion cleaner removes the tombstone or the user cancels it between + // the "list objects" and now + level.Warn(w.logger).Log("msg", "skipped missing tombstone file when updating bucket index", "requestID", id, "state", string(state)) + continue + } + if errors.Is(err, ErrTombstoneCorrupted) { + level.Error(w.logger).Log("msg", "skipped corrupted tombstone file when updating bucket index", "requestID", id, "state", state, "err", err) + continue + } + if err != nil { + return nil, err + } + + if w.isTombstoneForFiltering(t) { + out = append(out, t) + } + } + + return out, nil + +} + +func (w *Updater) updateTombstoneIndexEntry(ctx context.Context, requestID string, state cortex_tsdb.BlockDeleteRequestState) (*cortex_tsdb.Tombstone, error) { + + filename := requestID + "." + string(state) + ".json" + t, err := cortex_tsdb.ReadTombstoneFile(ctx, w.bkt, path.Join(cortex_tsdb.TombstonePath, filename)) + if errors.Is(err, cortex_tsdb.ErrTombstoneNotFound) { + return nil, errors.Wrap(ErrTombstoneNotFound, err.Error()) + } + if errors.Is(err, cortex_tsdb.ErrTombstoneDecode) { + return nil, errors.Wrap(ErrTombstoneCorrupted, err.Error()) + } + if err != nil { + return nil, err + } + + return t, nil +} + +func (w *Updater) isTombstoneForFiltering(t *cortex_tsdb.Tombstone) bool { + if t.State == cortex_tsdb.StatePending { + return true + } + + // Once the tombstone has been processed for permanent deletions, the data has been rewritten and + // the old block have been marked for deletion. + // The tombstones need to be used for query time filtering until we can guarantee that the queriers + // have picked up the new blocks and no longer will query any of the deleted blocks. + // This time should be enough to guarantee that the new blocks will be queried: + filterTimeAfterProcessed := w.bktCfg.SyncInterval.Milliseconds() + w.blocksDeletionDelay.Milliseconds() + w.blocksCleanupInterval.Milliseconds() + timePassedSinceProcessed := (time.Now().Unix() * 1000) - t.StateCreatedAt + + if t.State == cortex_tsdb.StateProcessed && filterTimeAfterProcessed > timePassedSinceProcessed { + return true + } + + return false +} diff --git a/pkg/storage/tsdb/bucketindex/updater_test.go b/pkg/storage/tsdb/bucketindex/updater_test.go index 93b0135821..8369f7ad80 100644 --- a/pkg/storage/tsdb/bucketindex/updater_test.go +++ b/pkg/storage/tsdb/bucketindex/updater_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/oklog/ulid" "github.com/pkg/errors" @@ -21,6 +22,11 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) +const ( + blockDeletionDelay = time.Minute * 5 + blocksCleanupInterval = time.Minute * 10 +) + func TestUpdater_UpdateIndex(t *testing.T) { const userID = "user-1" @@ -34,33 +40,40 @@ func TestUpdater_UpdateIndex(t *testing.T) { block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) returnedIdx, _, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2}) // Create new blocks, and update the index. block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50) block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4) + tombstone3 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request3", cortex_tsdb.StatePending) returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block2, block3, block4}, - []*metadata.DeletionMark{block2Mark, block4Mark}) + []*metadata.DeletionMark{block2Mark, block4Mark}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone3}) - // Hard delete a block and update the index. + // Hard delete a block and tombstone and update the index. require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID)) + require.NoError(t, cortex_tsdb.DeleteTombstoneFile(ctx, bkt, nil, userID, tombstone1.RequestID, tombstone1.State)) returnedIdx, _, err = w.UpdateIndex(ctx, returnedIdx) require.NoError(t, err) assertBucketIndexEqual(t, returnedIdx, bkt, userID, []tsdb.BlockMeta{block1, block3, block4}, - []*metadata.DeletionMark{block4Mark}) + []*metadata.DeletionMark{block4Mark}, + []*cortex_tsdb.Tombstone{tombstone2, tombstone3}) } func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { @@ -77,16 +90,18 @@ func TestUpdater_UpdateIndex_ShouldSkipPartialBlocks(t *testing.T) { block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) // Delete a block's meta.json to simulate a partial block. require.NoError(t, bkt.Delete(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1}) assert.Len(t, partials, 1) assert.True(t, errors.Is(partials[block3.ULID], ErrBlockMetaNotFound)) @@ -106,16 +121,18 @@ func TestUpdater_UpdateIndex_ShouldSkipBlocksWithCorruptedMeta(t *testing.T) { block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30) block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40) block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) // Overwrite a block's meta.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block3.ULID.String(), metadata.MetaFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2}, - []*metadata.DeletionMark{block2Mark}) + []*metadata.DeletionMark{block2Mark}, + []*cortex_tsdb.Tombstone{tombstone1}) assert.Len(t, partials, 1) assert.True(t, errors.Is(partials[block3.ULID], ErrBlockMetaCorrupted)) @@ -139,12 +156,136 @@ func TestUpdater_UpdateIndex_ShouldSkipCorruptedDeletionMarks(t *testing.T) { // Overwrite a block's deletion-mark.json with invalid data. require.NoError(t, bkt.Upload(ctx, path.Join(userID, block2Mark.ID.String(), metadata.DeletionMarkFilename), bytes.NewReader([]byte("invalid!}")))) - w := NewUpdater(bkt, userID, nil, logger) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) idx, partials, err := w.UpdateIndex(ctx, nil) require.NoError(t, err) assertBucketIndexEqual(t, idx, bkt, userID, []tsdb.BlockMeta{block1, block2, block3}, - []*metadata.DeletionMark{}) + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldSkipCorruptedTombstones(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + + // Overwrite a tombstone with invalid data. + require.NoError(t, bkt.Upload(ctx, path.Join(userID, cortex_tsdb.TombstonePath, tombstone1.GetFilename()), bytes.NewReader([]byte("invalid!}")))) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone2}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldNotUploadProcessedTombstonesPassedFilteringPeriod(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // The bucket index stores all the tombstones that need to be used for query filtering + // All pending state tombstones should be uploaded to the index + // Processed state tombstones should be uploaded if the following time period hasn't passed since they updated to processed + // -compactor.deletion-delay + -compactor.cleanup-interval + -blocks-storage.bucket-store.sync-interval + + bktStoreCfg := cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + } + + blockDeletionDelay := time.Minute * 2 + blocksCleanupInterval := time.Minute * 3 + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + // request2 should be uploaded because it is in pending state + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + // request3 should not be uploaded to the idx since the time period for filtering has passed + testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request3", cortex_tsdb.StateProcessed) + tombstone4 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*5).Unix()*1000, 0, 0, []string{"series"}, "request4", cortex_tsdb.StateProcessed) + // request5 should not be uploaded since cancelled tombstones are not required for filtering + testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request5", cortex_tsdb.StateCancelled) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, bktStoreCfg, logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone4}) + assert.Empty(t, partials) +} + +func TestUpdater_UpdateIndex_ShouldNotUploadDuplicateTombstones(t *testing.T) { + const userID = "user-1" + + bkt, _ := testutil.PrepareFilesystemBucket(t) + + ctx := context.Background() + logger := log.NewNopLogger() + + // There could be a scenario where a tombstone is updated to a new state + // but the file with the old state is not deleted. The bucket index should only + // keep the file with the most up to date state + + // In order the states are: pending, processed, cancelled + // cancelled state can only happen during pending but should take priority over all other states + + bktStoreCfg := cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + } + + blockDeletionDelay := time.Minute * 2 + blocksCleanupInterval := time.Minute * 3 + + // Mock some blocks and tombstones in the storage. + bkt = BucketWithGlobalMarkers(bkt) + block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20) + tombstone1 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request1", cortex_tsdb.StatePending) + tombstone2 := testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request2", cortex_tsdb.StatePending) + tombstone3 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request3", cortex_tsdb.StatePending) + tombstone4 := testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request4", cortex_tsdb.StatePending) + + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, bktStoreCfg, logger) + idx, partials, err := w.UpdateIndex(ctx, nil) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2, tombstone3, tombstone4}) + assert.Empty(t, partials) + + tombstone1 = testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*5).Unix()*1000, 0, 0, []string{"series"}, "request1", cortex_tsdb.StateProcessed) + // request3 should no longer be included in the bucket index since the filtering time for processed states is over + testutil.MockTombstone(t, bkt, userID, 0, time.Now().Add(-time.Minute*7).Unix()*1000, 0, 0, []string{"series"}, "request3", cortex_tsdb.StateProcessed) + // request4 should not be uploaded since cancelled tombstones are not included in bucket index + testutil.MockTombstone(t, bkt, userID, 0, 0, 0, 0, []string{"series"}, "request4", cortex_tsdb.StateCancelled) + + idx, partials, err = w.UpdateIndex(ctx, idx) + require.NoError(t, err) + assertBucketIndexEqual(t, idx, bkt, userID, + []tsdb.BlockMeta{block1}, + []*metadata.DeletionMark{}, + []*cortex_tsdb.Tombstone{tombstone1, tombstone2}) assert.Empty(t, partials) } @@ -155,7 +296,7 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) { bkt, _ := testutil.PrepareFilesystemBucket(t) for _, oldIdx := range []*Index{nil, {}} { - w := NewUpdater(bkt, userID, nil, log.NewNopLogger()) + w := NewUpdater(bkt, userID, nil, blockDeletionDelay, blocksCleanupInterval, newMockBucketStoreCfg(), log.NewNopLogger()) idx, partials, err := w.UpdateIndex(ctx, oldIdx) require.NoError(t, err) @@ -176,7 +317,7 @@ func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockI return attrs.LastModified.Unix() } -func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark) { +func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, expectedTombstones SeriesDeletionTombstones) { assert.Equal(t, IndexVersion1, idx.Version) assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2) @@ -203,4 +344,14 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI } assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks) + assert.ElementsMatch(t, expectedTombstones, idx.Tombstones) +} + +func newMockBucketStoreCfg() cortex_tsdb.BucketStoreConfig { + return cortex_tsdb.BucketStoreConfig{ + SyncInterval: time.Minute, + BucketIndex: cortex_tsdb.BucketIndexConfig{ + MaxStalePeriod: time.Hour, + }, + } } diff --git a/pkg/storage/tsdb/testutil/block_mock.go b/pkg/storage/tsdb/testutil/block_mock.go index a2a931aa59..2fa2903225 100644 --- a/pkg/storage/tsdb/testutil/block_mock.go +++ b/pkg/storage/tsdb/testutil/block_mock.go @@ -1,14 +1,17 @@ package testutil import ( + "bytes" "context" "crypto/rand" "encoding/json" "fmt" + "path" "strings" "testing" "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/require" @@ -66,3 +69,31 @@ func MockStorageDeletionMark(t testing.TB, bucket objstore.Bucket, userID string return &mark } + +func MockTombstone( + t testing.TB, + bucket objstore.Bucket, + userID string, + requestTime, + stateTime, + startTime, + endTime int64, + selectors []string, + requestID string, + state cortex_tsdb.BlockDeleteRequestState) *cortex_tsdb.Tombstone { + + ts := cortex_tsdb.NewTombstone(userID, requestTime, stateTime, startTime, endTime, selectors, requestID, state) + + tombstoneFilename := ts.GetFilename() + path := path.Join(userID, cortex_tsdb.TombstonePath, tombstoneFilename) + data, err := json.Marshal(ts) + + require.NoError(t, err) + require.NoError(t, bucket.Upload(context.Background(), path, bytes.NewReader(data))) + + ts.Matchers, err = cortex_tsdb.ParseMatchers(ts.Selectors) + require.NoError(t, err) + + return ts + +} diff --git a/pkg/storage/tsdb/tombstones.go b/pkg/storage/tsdb/tombstones.go index d244029b90..bc9c5192ad 100644 --- a/pkg/storage/tsdb/tombstones.go +++ b/pkg/storage/tsdb/tombstones.go @@ -32,8 +32,9 @@ const TombstonePath = "tombstones/" var ( ErrTombstoneAlreadyExists = errors.New("The deletion tombstone with the same request information already exists") ErrInvalidDeletionRequestState = errors.New("Deletion request filename extension indicating the state is invalid") - - AllDeletionStates = []BlockDeleteRequestState{StatePending, StateProcessed, StateCancelled} + ErrTombstoneNotFound = errors.New("Tombstone file not found in the object store") + ErrTombstoneDecode = errors.New("Unable to read tombstone contents from file") + AllDeletionStates = []BlockDeleteRequestState{StatePending, StateProcessed, StateCancelled} ) type Tombstone struct { @@ -45,7 +46,7 @@ type Tombstone struct { Selectors []string `json:"selectors"` Matchers []*labels.Matcher `json:"-"` UserID string `json:"user_id"` - State BlockDeleteRequestState `json:"-"` + State BlockDeleteRequestState `json:"state"` } func NewTombstone(userID string, requestTime int64, stateTime int64, startTime int64, endTime int64, selectors []string, requestID string, state BlockDeleteRequestState) *Tombstone { @@ -117,7 +118,7 @@ func GetDeleteRequestByIDForUser(ctx context.Context, bkt objstore.Bucket, cfgPr } if exists { - t, err := readTombstoneFile(ctx, userBucket, userID, path.Join(TombstonePath, filename)) + t, err := ReadTombstoneFile(ctx, userBucket, path.Join(TombstonePath, filename)) if err != nil { return nil, err } @@ -142,7 +143,7 @@ func GetAllDeleteRequestsForUser(ctx context.Context, bkt objstore.Bucket, cfgPr // if a key exists with the same request ID (but two different states) tombstoneMap := make(map[string]*Tombstone) err := userBucket.Iter(ctx, "tombstones/", func(s string) error { - t, err := readTombstoneFile(ctx, userBucket, userID, s) + t, err := ReadTombstoneFile(ctx, userBucket, s) if err != nil { return err } @@ -175,12 +176,12 @@ func GetAllDeleteRequestsForUser(ctx context.Context, bkt objstore.Bucket, cfgPr } func getLatestTombstateByState(a *Tombstone, b *Tombstone) (*Tombstone, error) { - orderA, err := a.GetStateOrder() + orderA, err := a.State.GetStateOrder() if err != nil { return nil, err } - orderB, err := b.GetStateOrder() + orderB, err := b.State.GetStateOrder() if err != nil { return nil, err } @@ -192,29 +193,16 @@ func getLatestTombstateByState(a *Tombstone, b *Tombstone) (*Tombstone, error) { return a, nil } -func readTombstoneFile(ctx context.Context, bkt objstore.BucketReader, userID string, tombstonePath string) (*Tombstone, error) { - userLogger := util_log.WithUserID(userID, util_log.Logger) - - // request filename is in format of request_id + "." + state + ".json" - - // This should get the first extension which is .json - filenameExtesion := filepath.Ext(tombstonePath) - filenameWithoutJSON := tombstonePath[0 : len(tombstonePath)-len(filenameExtesion)] - - stateExtension := filepath.Ext(filenameWithoutJSON) - - // Ensure that the state exists as the filename extension - if len(stateExtension) == 0 { - return nil, ErrInvalidDeletionRequestState - } - - state := BlockDeleteRequestState(stateExtension[1:]) - if !isValidDeleteRequestState(state) { - return nil, errors.Wrapf(ErrInvalidDeletionRequestState, "Filename extension is invalid for tombstone: %s", tombstonePath) - +func ReadTombstoneFile(ctx context.Context, bkt objstore.BucketReader, tombstonePath string) (*Tombstone, error) { + _, _, err := ParseTombstonePath(tombstonePath) + if err != nil { + return nil, errors.Wrapf(err, "failed to get the state from filename: %s", tombstonePath) } r, err := bkt.Get(ctx, tombstonePath) + if bkt.IsObjNotFoundErr(err) { + return nil, errors.Wrapf(ErrTombstoneNotFound, "tombstone file not found %s", tombstonePath) + } if err != nil { return nil, errors.Wrapf(err, "failed to read tombstone object: %s", tombstonePath) } @@ -224,16 +212,14 @@ func readTombstoneFile(ctx context.Context, bkt objstore.BucketReader, userID st // Close reader before dealing with decode error. if closeErr := r.Close(); closeErr != nil { - level.Warn(userLogger).Log("msg", "failed to close bucket reader", "err", closeErr) + level.Warn(util_log.Logger).Log("msg", "failed to close bucket reader", "err", closeErr) } if err != nil { - return nil, errors.Wrapf(err, "failed to decode tombstone object: %s", tombstonePath) + return nil, errors.Wrapf(ErrTombstoneDecode, "failed to decode tombstone object: %s, err: %v", tombstonePath, err.Error()) } - tombstone.State = BlockDeleteRequestState(state) - - tombstone.Matchers, err = parseMatchers(tombstone.Selectors) + tombstone.Matchers, err = ParseMatchers(tombstone.Selectors) if err != nil { return nil, errors.Wrapf(err, "failed to parse tombstone selectors for: %s", tombstonePath) } @@ -241,7 +227,29 @@ func readTombstoneFile(ctx context.Context, bkt objstore.BucketReader, userID st return tombstone, nil } -func parseMatchers(selectors []string) ([]*labels.Matcher, error) { +func ParseTombstonePath(tombstonePath string) (string, BlockDeleteRequestState, error) { + // This should get the first extension which is .json + filenameExtesion := filepath.Ext(tombstonePath) + filenameWithoutJSON := tombstonePath[0 : len(tombstonePath)-len(filenameExtesion)] + + stateExtension := filepath.Ext(filenameWithoutJSON) + requestID := filenameWithoutJSON[0 : len(filenameWithoutJSON)-len(stateExtension)] + + // Ensure that the state exists as the filename extension + if len(stateExtension) == 0 { + return "", "", ErrInvalidDeletionRequestState + } + + state := BlockDeleteRequestState(stateExtension[1:]) + if !isValidDeleteRequestState(state) { + return "", "", errors.Wrapf(ErrInvalidDeletionRequestState, "Filename extension is invalid for tombstone: %s", tombstonePath) + + } + + return requestID, state, nil +} + +func ParseMatchers(selectors []string) ([]*labels.Matcher, error) { // Convert the string selectors to label matchers var m []*labels.Matcher @@ -312,6 +320,14 @@ func RemoveCancelledStateIfExists(ctx context.Context, bkt objstore.Bucket, user return nil } +func (t *Tombstone) GetFilename() string { + return t.RequestID + "." + string(t.State) + ".json" +} + +func (t Tombstone) IsOverlappingInterval(minT int64, maxT int64) bool { + return t.StartTime <= maxT && minT < t.EndTime +} + func isValidDeleteRequestState(state BlockDeleteRequestState) bool { switch state { case @@ -323,8 +339,8 @@ func isValidDeleteRequestState(state BlockDeleteRequestState) bool { return false } -func (t *Tombstone) GetStateOrder() (int, error) { - switch t.State { +func (s BlockDeleteRequestState) GetStateOrder() (int, error) { + switch s { case StatePending: return 0, nil case StateProcessed: diff --git a/pkg/storage/tsdb/tombstones_test.go b/pkg/storage/tsdb/tombstones_test.go index 178f523dbb..c3e258b07e 100644 --- a/pkg/storage/tsdb/tombstones_test.go +++ b/pkg/storage/tsdb/tombstones_test.go @@ -240,22 +240,28 @@ func TestTombstoneReadWithInvalidFileName(t *testing.T) { { tInvalidPath := username + "/tombstones/" + requestID + "." + string(StatePending) - _, err := readTombstoneFile(ctx, bkt, username, tInvalidPath) + _, err := ReadTombstoneFile(ctx, bkt, tInvalidPath) require.ErrorIs(t, err, ErrInvalidDeletionRequestState) } { tInvalidPath := username + "/tombstones/" + requestID - _, err := readTombstoneFile(ctx, bkt, username, tInvalidPath) + _, err := ReadTombstoneFile(ctx, bkt, tInvalidPath) require.ErrorIs(t, err, ErrInvalidDeletionRequestState) } { tInvalidPath := username + "/tombstones/" + requestID + ".json." + string(StatePending) - _, err := readTombstoneFile(ctx, bkt, username, tInvalidPath) + _, err := ReadTombstoneFile(ctx, bkt, tInvalidPath) require.ErrorIs(t, err, ErrInvalidDeletionRequestState) } + { + tNotExists := username + "/tombstones/" + requestID + "." + string(StatePending) + ".json" + _, err := ReadTombstoneFile(ctx, bkt, tNotExists) + require.ErrorIs(t, err, ErrTombstoneNotFound) + } + } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index a3ba3aa814..bde9fbe270 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -968,7 +968,7 @@ func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, } func createBucketIndex(t *testing.T, bkt objstore.Bucket, userID string) *bucketindex.Index { - updater := bucketindex.NewUpdater(bkt, userID, nil, log.NewNopLogger()) + updater := bucketindex.NewUpdater(bkt, userID, nil, time.Minute, time.Minute, cortex_tsdb.BucketStoreConfig{}, log.NewNopLogger()) idx, _, err := updater.UpdateIndex(context.Background(), nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(context.Background(), bkt, userID, nil, idx)) diff --git a/pkg/storegateway/metadata_fetcher_filters_test.go b/pkg/storegateway/metadata_fetcher_filters_test.go index 84e6ec0cfe..43acab37cc 100644 --- a/pkg/storegateway/metadata_fetcher_filters_test.go +++ b/pkg/storegateway/metadata_fetcher_filters_test.go @@ -20,6 +20,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/cortexproject/cortex/pkg/storage/bucket" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) @@ -68,7 +69,7 @@ func testIgnoreDeletionMarkFilter(t *testing.T, bucketIndexEnabled bool) { if bucketIndexEnabled { var err error - u := bucketindex.NewUpdater(bkt, userID, nil, logger) + u := bucketindex.NewUpdater(bkt, userID, nil, time.Minute, time.Minute, cortex_tsdb.BucketStoreConfig{}, logger) idx, _, err = u.UpdateIndex(ctx, nil) require.NoError(t, err) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, idx)) From 8deff6b0b8f3f832025cb61378c563a0f92fbfb2 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Thu, 29 Jul 2021 16:12:15 -0400 Subject: [PATCH 2/5] Add missing unit tests Signed-off-by: ilangofman --- pkg/querier/blocks_store_queryable_test.go | 196 ++++++++++++++++++++- pkg/storage/tsdb/bucketindex/updater.go | 4 - 2 files changed, 188 insertions(+), 12 deletions(-) diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index d7bb456836..c0441a2afc 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -15,6 +15,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" @@ -28,6 +29,7 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util" @@ -44,14 +46,16 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} - noOpQueryLimiter = limiter.NewQueryLimiter(0) + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpQueryLimiter = limiter.NewQueryLimiter(0) + metricNameMatcher = labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName) + seriesMatcher2 = labels.MustNewMatcher(labels.MatchEqual, "series", "2") ) type valueResult struct { @@ -73,6 +77,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { expectedSeries []seriesResult expectedErr error expectedMetrics string + tombstoneSet *purger.TombstonesSet }{ "no block in the storage matching the query time range": { finderResult: nil, @@ -510,6 +515,172 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { queryLimiter: limiter.NewQueryLimiter(1), expectedErr: validation.LimitError(fmt.Sprintf("The query hit the max number of series limit (limit: %d)", 1)), }, + "single tombstones exist and should filter the entire returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 2), + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult(nil), + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 3), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+3)), + }, + "single tombstones exist and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+1)), + }, + "multiple tombstones exist for the same series and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+10, 3), + + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+10, 3), + + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + {t: minT + 10, v: 3}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}, + {StartTime: model.Time(minT + 6), EndTime: model.Time(minT + 9), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}}, + model.Time(minT), model.Time(minT+9)), + }, + "multiple tombstones exist for the different series and should filter part of returned series": { + finderResult: bucketindex.Blocks{ + {ID: block1}, + {ID: block2}, + }, + storeSetResponses: []interface{}{ + map[BlocksStoreClient][]ulid.ULID{ + &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+2, 3), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+3, 4), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+7, 2), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+9, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+10, 3), + + mockHintsResponse(block1), + }}: {block1}, + &storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{ + mockSeriesResponse(labels.Labels{metricNameLabel}, minT, 1), + mockSeriesResponse(labels.Labels{metricNameLabel}, minT+1, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+1, 1), + mockSeriesResponse(labels.Labels{series2Label}, minT+7, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+9, 2), + mockSeriesResponse(labels.Labels{series2Label}, minT+10, 3), + + mockHintsResponse(block2), + }}: {block2}, + }, + }, + limits: &blocksStoreLimitsMock{}, + queryLimiter: noOpQueryLimiter, + expectedSeries: []seriesResult{ + { + lbls: labels.New(metricNameLabel), + values: []valueResult{ + {t: minT + 2, v: 3}, + {t: minT + 3, v: 4}, + {t: minT + 7, v: 2}, + {t: minT + 9, v: 2}, + }, + }, + { + lbls: labels.New(series2Label), + values: []valueResult{ + {t: minT + 1, v: 1}, + {t: minT + 10, v: 3}, + }, + }, + }, + tombstoneSet: purger.NewTombstoneSet([]purger.DeleteRequest{ + {StartTime: model.Time(minT), EndTime: model.Time(minT + 1), Matchers: [][]*labels.Matcher{{metricNameMatcher}}}, + {StartTime: model.Time(minT + 6), EndTime: model.Time(minT + 9), Matchers: [][]*labels.Matcher{{seriesMatcher2}}}}, + model.Time(minT), model.Time(minT+9)), + }, } for testName, testData := range tests { @@ -519,6 +690,7 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(testData.tombstoneSet, error(nil)) q := &blocksStoreQuerier{ ctx: ctx, @@ -997,6 +1169,7 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) { stores := &blocksStoreSetMock{mockedResponses: testData.storeSetResponses} finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(testData.finderResult, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), testData.finderErr) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) q := &blocksStoreQuerier{ ctx: ctx, @@ -1093,6 +1266,7 @@ func TestBlocksStoreQuerier_SelectSortedShouldHonorQueryStoreAfter(t *testing.T) t.Run(testName, func(t *testing.T) { finder := &blocksFinderMock{} finder.On("GetBlocks", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(bucketindex.Blocks(nil), map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) q := &blocksStoreQuerier{ ctx: context.Background(), @@ -1159,6 +1333,7 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) { {ID: block1}, {ID: block2}, }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), error(nil)) + finder.On("GetTombstones", mock.Anything, "user-1", mock.Anything, mock.Anything).Return(purger.NewTombstoneSet(nil, 0, 0), error(nil)) // Mock the store to simulate each block is queried from a different store-gateway. gateway1 := &storeGatewayClientMock{remoteAddr: "1.1.1.1", mockedSeriesResponses: []*storepb.SeriesResponse{ @@ -1284,6 +1459,11 @@ func (m *blocksFinderMock) GetBlocks(ctx context.Context, userID string, minT, m return args.Get(0).(bucketindex.Blocks), args.Get(1).(map[ulid.ULID]*bucketindex.BlockDeletionMark), args.Error(2) } +func (m *blocksFinderMock) GetTombstones(ctx context.Context, userID string, minT, maxT int64) (*purger.TombstonesSet, error) { + args := m.Called(ctx, userID, minT, maxT) + return args.Get(0).(*purger.TombstonesSet), args.Error(1) +} + type storeGatewayClientMock struct { remoteAddr string mockedSeriesResponses []*storepb.SeriesResponse diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index d71ed74e42..2d7ea94be0 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -250,10 +250,6 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid return BlockDeletionMarkFromThanosMarker(&m), nil } -// just get the tombstone filename and save that to a discovered list -// do all the caching stuff and remove the old ones from the discovered list. -// Then once we get the final list, remove duplicates -// TODO make it work with the old tombstones to help with caching func (w *Updater) updateSeriesDeletionTombstones(ctx context.Context, oldTombstones []*cortex_tsdb.Tombstone) ([]*cortex_tsdb.Tombstone, error) { out := make([]*cortex_tsdb.Tombstone, 0, len(oldTombstones)) discovered := make(map[string]cortex_tsdb.BlockDeleteRequestState) From 3db2c871263323dd143cec4a3938469716eb3b62 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Thu, 29 Jul 2021 19:28:51 -0400 Subject: [PATCH 3/5] remove extra lines Signed-off-by: ilangofman --- pkg/compactor/compactor_test.go | 1 - pkg/querier/blocks_finder_bucket_index.go | 1 - 2 files changed, 2 deletions(-) diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index fbc0fed1ed..0fd99a1d70 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -1046,7 +1046,6 @@ func uploadTombstone(t *testing.T, bkt objstore.Bucket, userID string, tombstone require.NoError(t, err) require.NoError(t, bkt.Upload(context.Background(), path, bytes.NewReader(data))) - } func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) { diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index baa18d0e10..3f29a39054 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -51,7 +51,6 @@ func NewBucketIndexBlocksFinder(cfg BucketIndexBlocksFinderConfig, bkt objstore. } // GetBlocks implements BlocksFinder. -// ILAN IMPORTANT func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) { if f.State() != services.Running { return nil, nil, errBucketIndexBlocksFinderNotRunning From 0bbd21f1cd844786cbf5588a18a84d7b5a20f561 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Fri, 30 Jul 2021 11:04:44 -0400 Subject: [PATCH 4/5] Minor refactor Signed-off-by: ilangofman --- pkg/querier/blocks_finder_bucket_scan.go | 2 +- pkg/storage/tsdb/bucketindex/updater.go | 6 ++---- pkg/storage/tsdb/tombstones.go | 6 +++++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/querier/blocks_finder_bucket_scan.go b/pkg/querier/blocks_finder_bucket_scan.go index 0a6efa9289..ec92c2255b 100644 --- a/pkg/querier/blocks_finder_bucket_scan.go +++ b/pkg/querier/blocks_finder_bucket_scan.go @@ -152,7 +152,7 @@ func (d *BucketScanBlocksFinder) GetBlocks(_ context.Context, userID string, min } func (d *BucketScanBlocksFinder) GetTombstones(_ context.Context, _ string, _ int64, _ int64) (*purger.TombstonesSet, error) { - level.Warn(util_log.Logger).Log("msg", "Series deletion query time Filtering is not supported without bucket index enabled") + // Series deletion query time filtering is not supported without bucket index enabled return nil, nil } diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 2d7ea94be0..f97ec41f9a 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -350,10 +350,8 @@ func (w *Updater) isTombstoneForFiltering(t *cortex_tsdb.Tombstone) bool { // The tombstones need to be used for query time filtering until we can guarantee that the queriers // have picked up the new blocks and no longer will query any of the deleted blocks. // This time should be enough to guarantee that the new blocks will be queried: - filterTimeAfterProcessed := w.bktCfg.SyncInterval.Milliseconds() + w.blocksDeletionDelay.Milliseconds() + w.blocksCleanupInterval.Milliseconds() - timePassedSinceProcessed := (time.Now().Unix() * 1000) - t.StateCreatedAt - - if t.State == cortex_tsdb.StateProcessed && filterTimeAfterProcessed > timePassedSinceProcessed { + filterTimeAfterProcessed := w.bktCfg.SyncInterval + w.blocksDeletionDelay + w.blocksCleanupInterval + if t.State == cortex_tsdb.StateProcessed && filterTimeAfterProcessed > time.Since(t.GetStateTime()) { return true } diff --git a/pkg/storage/tsdb/tombstones.go b/pkg/storage/tsdb/tombstones.go index bc9c5192ad..f82990b34d 100644 --- a/pkg/storage/tsdb/tombstones.go +++ b/pkg/storage/tsdb/tombstones.go @@ -324,10 +324,14 @@ func (t *Tombstone) GetFilename() string { return t.RequestID + "." + string(t.State) + ".json" } -func (t Tombstone) IsOverlappingInterval(minT int64, maxT int64) bool { +func (t *Tombstone) IsOverlappingInterval(minT int64, maxT int64) bool { return t.StartTime <= maxT && minT < t.EndTime } +func (t *Tombstone) GetStateTime() time.Time { + return time.Unix(t.StateCreatedAt/1000, 0) +} + func isValidDeleteRequestState(state BlockDeleteRequestState) bool { switch state { case From 6400dcc767f1d4476b4acde6871577d211966552 Mon Sep 17 00:00:00 2001 From: ilangofman Date: Mon, 16 Aug 2021 16:18:42 -0700 Subject: [PATCH 5/5] Fix nit comments Signed-off-by: ilangofman --- pkg/querier/blocks_store_queryable.go | 2 +- pkg/querier/blocks_store_queryable_test.go | 18 +++++++++--------- pkg/storage/tsdb/bucketindex/index.go | 2 +- pkg/storage/tsdb/bucketindex/updater.go | 6 +++--- 4 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 88fd130c69..f9c7ce7a85 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -79,7 +79,7 @@ type BlocksFinder interface { // and maxT (milliseconds, both included). Returned blocks are sorted by MaxTime descending. GetBlocks(ctx context.Context, userID string, minT, maxT int64) (bucketindex.Blocks, map[ulid.ULID]*bucketindex.BlockDeletionMark, error) - // // GetTombstones returns all the tombstones that are currently required for filtering deleted series + // GetTombstones returns all the tombstones that are currently required for filtering deleted series. GetTombstones(ctx context.Context, userID string, minT, maxT int64) (*purger.TombstonesSet, error) } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index 0221640e0c..90ff7a7326 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -46,15 +46,15 @@ func TestBlocksStoreQuerier_Select(t *testing.T) { ) var ( - block1 = ulid.MustNew(1, nil) - block2 = ulid.MustNew(2, nil) - block3 = ulid.MustNew(3, nil) - block4 = ulid.MustNew(4, nil) - metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} - series1Label = labels.Label{Name: "series", Value: "1"} - series2Label = labels.Label{Name: "series", Value: "2"} - noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) - metricNameMatcher = labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName) + block1 = ulid.MustNew(1, nil) + block2 = ulid.MustNew(2, nil) + block3 = ulid.MustNew(3, nil) + block4 = ulid.MustNew(4, nil) + metricNameLabel = labels.Label{Name: labels.MetricName, Value: metricName} + series1Label = labels.Label{Name: "series", Value: "1"} + series2Label = labels.Label{Name: "series", Value: "2"} + noOpQueryLimiter = limiter.NewQueryLimiter(0, 0, 0) + metricNameMatcher = labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, metricName) seriesMatcher2 = labels.MustNewMatcher(labels.MatchEqual, "series", "2") ) diff --git a/pkg/storage/tsdb/bucketindex/index.go b/pkg/storage/tsdb/bucketindex/index.go index f6e0117feb..f3b3052e3d 100644 --- a/pkg/storage/tsdb/bucketindex/index.go +++ b/pkg/storage/tsdb/bucketindex/index.go @@ -38,7 +38,7 @@ type Index struct { // List of block deletion marks. BlockDeletionMarks BlockDeletionMarks `json:"block_deletion_marks"` - // List of tombstones that require query time filtering for deleted series + // List of tombstones that are required for query time filtering of deleted series. Tombstones SeriesDeletionTombstones `json:"series_deletion_tombstones"` // UpdatedAt is a unix timestamp (seconds precision) of when the index has been updated diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 2d7ea94be0..4ecf7f8389 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -254,9 +254,9 @@ func (w *Updater) updateSeriesDeletionTombstones(ctx context.Context, oldTombsto out := make([]*cortex_tsdb.Tombstone, 0, len(oldTombstones)) discovered := make(map[string]cortex_tsdb.BlockDeleteRequestState) - err := w.bkt.Iter(ctx, "tombstones/", func(s string) error { - tName := filepath.Base(s) - requestID, state, err := cortex_tsdb.ParseTombstonePath(tName) + err := w.bkt.Iter(ctx, cortex_tsdb.TombstonePath, func(s string) error { + tombstoneName := filepath.Base(s) + requestID, state, err := cortex_tsdb.GetTombstoneStateAndRequestIDFromPath(tombstoneName) if err != nil { return err }