From 3ee127308e6bd27d579c4a32f67aebc00e2934bb Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 09:40:48 -0800 Subject: [PATCH 01/14] use .tar.gz and .json storage extensions Signed-off-by: Owen Diehl --- pkg/storage/stores/shipper/bloomshipper/resolver.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 40a59cee42dbc..88235b9e153f0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -14,6 +14,9 @@ const ( BloomPrefix = "bloom" MetasPrefix = "metas" BlocksPrefix = "blocks" + + extTarGz = ".tar.gz" + extJSON = ".json" ) // KeyResolver is an interface for resolving keys to locations. @@ -36,7 +39,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location { fmt.Sprintf("%v", ref.TableName), ref.TenantID, MetasPrefix, - fmt.Sprintf("%v-%v", ref.Bounds, ref.Checksum), + fmt.Sprintf("%v-%v%s", ref.Bounds, ref.Checksum, extJSON), } } @@ -50,7 +53,8 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { if err != nil { return MetaRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - checksum, err := strconv.ParseUint(fnParts[2], 16, 64) + withoutExt := strings.TrimSuffix(fnParts[2], extJSON) + checksum, err := strconv.ParseUint(withoutExt, 16, 64) if err != nil { return MetaRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) } @@ -77,7 +81,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location { ref.TenantID, BlocksPrefix, ref.Bounds.String(), - fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum), + fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, extTarGz), } } From 573a90dd3ca95cf5917d04e2059ad94fd65d024e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 10:37:59 -0800 Subject: [PATCH 02/14] more bloomcompactor observability Signed-off-by: Owen Diehl --- pkg/bloomcompactor/bloomcompactor.go | 15 ++++++- pkg/bloomcompactor/controller.go | 58 ++++++++++++++++++++-------- pkg/bloomcompactor/metrics.go | 15 +++++++ 3 files changed, 69 insertions(+), 19 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 3bb1c815e8295..ef41edca95572 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error // runs a single round of compaction for all relevant tenants and tables func (c *Compactor) runOne(ctx context.Context) error { + level.Info(c.logger).Log("msg", "running bloom compaction") var workersErr error var wg sync.WaitGroup ch := make(chan tenantTable) @@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error { err := c.loadWork(ctx, ch) wg.Wait() - return multierror.New(workersErr, err, ctx.Err()).Err() + err = multierror.New(workersErr, err, ctx.Err()).Err() + if err != nil { + level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err) + } + return err } func (c *Compactor) tables(ts time.Time) *dayRangeIterator { @@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator { fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) + level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay) return newDayRangeIterator(fromDay, throughDay, c.schemaCfg) } @@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { for tables.Next() && tables.Err() == nil && ctx.Err() == nil { table := tables.At() + level.Debug(c.logger).Log("msg", "loading work for table", "table", table) + tenants, err := c.tenants(ctx, table) if err != nil { return errors.Wrap(err, "getting tenants") @@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { if err != nil { return errors.Wrap(err, "checking tenant ownership") } + level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns) if !owns { c.metrics.tenantsSkipped.Inc() continue @@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { } if err := tenants.Err(); err != nil { + level.Error(c.logger).Log("msg", "error iterating tenants", "err", err) return errors.Wrap(err, "iterating tenants") } } if err := tables.Err(); err != nil { + level.Error(c.logger).Log("msg", "error iterating tables", "err", err) return errors.Wrap(err, "iterating tables") } @@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error } func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error { - level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange) + level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String()) return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange) } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index ef41ec2d8efbb..8623d1acb7914 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -70,7 +70,7 @@ func (s *SimpleBloomController) compactTenant( tenant string, ownershipRange v1.FingerprintBounds, ) error { - logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr()) + logger := log.With(s.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String()) client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { @@ -92,6 +92,8 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to get metas") } + level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) + // build compaction plans work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) if err != nil { @@ -126,7 +128,13 @@ func (s *SimpleBloomController) compactTenant( superset = union[0] } - metas, err = s.bloomStore.FetchMetas( + level.Debug(logger).Log( + "msg", "looking for superset metas", + "superset", superset.String(), + "superset_within", superset.Within(ownershipRange), + ) + + freshMetas, err := s.bloomStore.FetchMetas( ctx, bloomshipper.MetaSearchParams{ TenantID: tenant, @@ -139,37 +147,46 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to get meta supseret range") } + level.Debug(logger).Log( + "msg", "found superset metas", + "metas", len(metas), + "fresh_metas", len(freshMetas), + "delta", len(freshMetas)-len(metas), + ) + // combine built and pre-existing metas // in preparation for removing outdated metas - metas = append(metas, built...) + combined := append(freshMetas, built...) - outdated := outdatedMetas(metas) + outdated := outdatedMetas(combined) + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) for _, meta := range outdated { for _, block := range meta.Blocks { - if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil { + err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}) + if err != nil { if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block) - continue + level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) + } else { + level.Error(logger).Log("msg", "failed to delete blocks", "err", err, "block", block.String()) } - - level.Error(logger).Log("msg", "failed to delete blocks", "err", err) - return errors.Wrap(err, "failed to delete blocks") } + level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) } - if err := client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}); err != nil { + err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) + if err != nil { if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef) + level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) } else { - level.Error(logger).Log("msg", "failed to delete metas", "err", err) - return errors.Wrap(err, "failed to delete metas") + level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) + return errors.Wrap(err, "failed to delete meta") } } + level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) } level.Debug(logger).Log("msg", "finished compaction") return nil - } func (s *SimpleBloomController) findOutdatedGaps( @@ -271,6 +288,7 @@ func (s *SimpleBloomController) buildGaps( for i := range plan.gaps { gap := plan.gaps[i] + logger := log.With(logger, "gap", gap.bounds.String(), "tsdb", plan.tsdb.Name()) meta := bloomshipper.Meta{ MetaRef: bloomshipper.MetaRef{ @@ -304,9 +322,11 @@ func (s *SimpleBloomController) buildGaps( blocksIter, s.rwFn, s.metrics, - log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap), + logger, ) + level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.blocks)) + newBlocks := gen.Generate(ctx) if err != nil { level.Error(logger).Log("msg", "failed to generate bloom", "err", err) @@ -333,6 +353,8 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() return nil, errors.Wrap(err, "failed to write block") } + s.metrics.blocksCreated.Inc() + level.Debug(logger).Log("msg", "uploaded block", "block", built.BlockRef.String()) meta.Blocks = append(meta.Blocks, built.BlockRef) } @@ -357,8 +379,10 @@ func (s *SimpleBloomController) buildGaps( level.Error(logger).Log("msg", "failed to write meta", "err", err) return nil, errors.Wrap(err, "failed to write meta") } - created = append(created, meta) + s.metrics.metasCreated.Inc() + level.Debug(logger).Log("msg", "uploaded meta", "meta", meta.MetaRef.String()) + created = append(created, meta) totalSeries += uint64(seriesItrWithCounter.Count()) } } diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index 350e3ed7e480e..dc2b67da53199 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -31,6 +31,9 @@ type Metrics struct { tenantsCompleted *prometheus.CounterVec tenantsCompletedTime *prometheus.HistogramVec tenantsSeries prometheus.Histogram + + blocksCreated prometheus.Counter + metasCreated prometheus.Counter } func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { @@ -115,6 +118,18 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { // Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10), }), + blocksCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "blocks_created", + Help: "Number of blocks created", + }), + metasCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "metas_created", + Help: "Number of metas created", + }), } return &m From 2d00cc858081cf172e279538d28868b7106b9a91 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 12:04:40 -0800 Subject: [PATCH 03/14] fixes extension+path resolution in bloom compactor Signed-off-by: Owen Diehl --- pkg/bloomcompactor/tsdb.go | 4 ++-- pkg/storage/stores/shipper/bloomshipper/resolver.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index 6159ce02a804a..c639d3db61474 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -236,8 +236,8 @@ func NewTSDBStores( if err != nil { return nil, errors.Wrap(err, "failed to create object client") } - prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix) - res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, prefix)) + // prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix) + res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix)) } } diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 88235b9e153f0..7d224b9f01392 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -39,7 +39,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location { fmt.Sprintf("%v", ref.TableName), ref.TenantID, MetasPrefix, - fmt.Sprintf("%v-%v%s", ref.Bounds, ref.Checksum, extJSON), + fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, extJSON), } } From 4008e6c19d5eefa0f6f04f0757dd8189d25ba861 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 12:05:45 -0800 Subject: [PATCH 04/14] resolves old metas used in deletion prior to building new ones avoids race conditions in storage Signed-off-by: Owen Diehl --- pkg/bloomcompactor/bloomcompactor.go | 2 +- pkg/bloomcompactor/controller.go | 96 +++++++++++++++++----------- 2 files changed, 59 insertions(+), 39 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index ef41edca95572..cc96cc7219e8d 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -214,7 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error // runs a single round of compaction for all relevant tenants and tables func (c *Compactor) runOne(ctx context.Context) error { - level.Info(c.logger).Log("msg", "running bloom compaction") + level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism) var workersErr error var wg sync.WaitGroup ch := make(chan tenantTable) diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 8623d1acb7914..75a9755fa3a83 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -94,6 +94,13 @@ func (s *SimpleBloomController) compactTenant( level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) + // fetch all metas overlapping all metas overlapping our ownership range so we can safely + // check which metas can be deleted even if they only partially overlap out ownership range + superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger) + if err != nil { + return errors.Wrap(err, "failed to fetch superset") + } + // build compaction plans work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) if err != nil { @@ -106,6 +113,50 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to build gaps") } + // combine built and superset metas + // in preparation for removing outdated ones + combined := append(superset, built...) + + outdated := outdatedMetas(combined) + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + for _, meta := range outdated { + for _, block := range meta.Blocks { + err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}) + if err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) + } else { + level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String()) + return errors.Wrap(err, "failed to delete block") + } + } + level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) + } + + err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) + if err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) + } else { + level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) + return errors.Wrap(err, "failed to delete meta") + } + } + level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) + } + + level.Debug(logger).Log("msg", "finished compaction") + return nil +} + +func (s *SimpleBloomController) fetchSuperSet( + ctx context.Context, + tenant string, + table config.DayTable, + ownershipRange v1.FingerprintBounds, + metas []bloomshipper.Meta, + logger log.Logger, +) ([]bloomshipper.Meta, error) { // in order to delete outdates metas which only partially fall within the ownership range, // we need to fetcha all metas in the entire bound range of the first set of metas we've resolved /* @@ -123,7 +174,7 @@ func (s *SimpleBloomController) compactTenant( union := superset.Union(meta.Bounds) if len(union) > 1 { level.Error(logger).Log("msg", "meta bounds union is not a single range", "union", union) - return errors.New("meta bounds union is not a single range") + return nil, errors.New("meta bounds union is not a single range") } superset = union[0] } @@ -134,7 +185,7 @@ func (s *SimpleBloomController) compactTenant( "superset_within", superset.Within(ownershipRange), ) - freshMetas, err := s.bloomStore.FetchMetas( + supersetMetas, err := s.bloomStore.FetchMetas( ctx, bloomshipper.MetaSearchParams{ TenantID: tenant, @@ -142,51 +193,20 @@ func (s *SimpleBloomController) compactTenant( Keyspace: superset, }, ) + if err != nil { level.Error(logger).Log("msg", "failed to get meta superset range", "err", err, "superset", superset) - return errors.Wrap(err, "failed to get meta supseret range") + return nil, errors.Wrap(err, "failed to get meta supseret range") } level.Debug(logger).Log( "msg", "found superset metas", "metas", len(metas), - "fresh_metas", len(freshMetas), - "delta", len(freshMetas)-len(metas), + "fresh_metas", len(supersetMetas), + "delta", len(supersetMetas)-len(metas), ) - // combine built and pre-existing metas - // in preparation for removing outdated metas - combined := append(freshMetas, built...) - - outdated := outdatedMetas(combined) - level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) - for _, meta := range outdated { - for _, block := range meta.Blocks { - err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}) - if err != nil { - if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) - } else { - level.Error(logger).Log("msg", "failed to delete blocks", "err", err, "block", block.String()) - } - } - level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) - } - - err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) - if err != nil { - if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) - } else { - level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) - return errors.Wrap(err, "failed to delete meta") - } - } - level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) - } - - level.Debug(logger).Log("msg", "finished compaction") - return nil + return supersetMetas, nil } func (s *SimpleBloomController) findOutdatedGaps( From 309decda4a8d7cacff4db1ac88eab1720876f906 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 12:34:46 -0800 Subject: [PATCH 05/14] remove unused blocktombstones Signed-off-by: Owen Diehl --- pkg/bloomgateway/util_test.go | 3 +- .../stores/shipper/bloomshipper/client.go | 4 -- .../shipper/bloomshipper/client_test.go | 6 +-- .../shipper/bloomshipper/fetcher_test.go | 3 +- .../stores/shipper/bloomshipper/shipper.go | 21 ++------- .../shipper/bloomshipper/shipper_test.go | 43 ------------------- .../stores/shipper/bloomshipper/store_test.go | 3 +- 7 files changed, 8 insertions(+), 75 deletions(-) diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 6bc43cf794342..e9776dfef78f5 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -323,8 +323,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, MetaRef: bloomshipper.MetaRef{ Ref: ref, }, - BlockTombstones: []bloomshipper.BlockRef{}, - Blocks: []bloomshipper.BlockRef{blockRef}, + Blocks: []bloomshipper.BlockRef{blockRef}, } block, data, _ := v1.MakeBlock(t, n, fromFp, throughFp, from, through) // Printing fingerprints and the log lines of its chunks comes handy for debugging... diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 882b0eab41c24..240f2b5166588 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -88,10 +88,6 @@ type Meta struct { // The specific TSDB files used to generate the block. Sources []tsdb.SingleTenantTSDBIdentifier - // TODO(owen-d): remove, unused - // Old blocks which can be deleted in the future. These should be from previous compaction rounds. - BlockTombstones []BlockRef - // A list of blocks that were generated Blocks []BlockRef } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 897ed519946a7..e5bbe3b5b1bf5 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -63,8 +63,7 @@ func putMeta(c *BloomClient, tenant string, start model.Time, minFp, maxFp model // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } raw, _ := json.Marshal(meta) return meta, c.client.PutObject(context.Background(), c.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) @@ -129,8 +128,7 @@ func TestBloomClient_PutMeta(t *testing.T) { // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } err := c.PutMeta(ctx, meta) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 40a695e0b8e6c..962bebb9956fd 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -34,8 +34,7 @@ func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keysp EndTimestamp: ts, }, }, - BlockTombstones: []BlockRef{}, - Blocks: []BlockRef{}, + Blocks: []BlockRef{}, } } return metas diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index fd755b0a204a7..3267886ac063e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -55,30 +55,15 @@ func (s *Shipper) Stop() { } // BlocksForMetas returns all the blocks from all the metas listed that are within the requested bounds -// and not tombstoned in any of the metas -func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef { - blocks := make(map[BlockRef]bool) // block -> isTombstoned - +func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) (refs []BlockRef) { for _, meta := range metas { - for _, tombstone := range meta.BlockTombstones { - blocks[tombstone] = true - } for _, block := range meta.Blocks { - tombstoned, ok := blocks[block] - if ok && tombstoned { - // skip tombstoned blocks - continue + if !isOutsideRange(block, interval, keyspaces) { + refs = append(refs, block) } - blocks[block] = false } } - refs := make([]BlockRef, 0, len(blocks)) - for ref, tombstoned := range blocks { - if !tombstoned && !isOutsideRange(ref, interval, keyspaces) { - refs = append(refs, ref) - } - } sort.Slice(refs, func(i, j int) bool { return refs[i].Bounds.Less(refs[j].Bounds) }) diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index c9e47f91fea28..e03d72c26ba37 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -14,49 +14,6 @@ import ( ) func TestBloomShipper_findBlocks(t *testing.T) { - t.Run("expected block that are specified in tombstones to be filtered out", func(t *testing.T) { - metas := []Meta{ - { - Blocks: []BlockRef{ - //this blockRef is marked as deleted in the next meta - createMatchingBlockRef(1), - createMatchingBlockRef(2), - }, - }, - { - Blocks: []BlockRef{ - //this blockRef is marked as deleted in the next meta - createMatchingBlockRef(3), - createMatchingBlockRef(4), - }, - }, - { - BlockTombstones: []BlockRef{ - createMatchingBlockRef(1), - createMatchingBlockRef(3), - }, - Blocks: []BlockRef{ - createMatchingBlockRef(5), - }, - }, - } - - ts := model.Now() - - interval := NewInterval( - ts.Add(-2*time.Hour), - ts.Add(-1*time.Hour), - ) - blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}}) - - expectedBlockRefs := []BlockRef{ - createMatchingBlockRef(2), - createMatchingBlockRef(4), - createMatchingBlockRef(5), - } - require.ElementsMatch(t, expectedBlockRefs, blocks) - }) - tests := map[string]struct { minFingerprint uint64 maxFingerprint uint64 diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index ca86cb94fa963..c99aa46df4bf3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -83,8 +83,7 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } err := store.storeDo(start, func(s *bloomStoreEntry) error { raw, _ := json.Marshal(meta) From 155b58b0b8c0de152eca9e1d6088fc99be6659c4 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 13:19:13 -0800 Subject: [PATCH 06/14] links flag with var Signed-off-by: Owen Diehl --- pkg/validation/limits.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 9627718aa8ec7..00ee2e152144a 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -339,7 +339,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.") f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.") _ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize) - f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", + fmt.Sprintf( + "The maximum bloom block size. A value of 0 sets an unlimited size. Default is %s. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.", + defaultBloomCompactorMaxBlockSize, + ), + ) l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) From 19e32f48acb9b6b006f1d10e966943439ac7c682 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 16 Feb 2024 13:19:32 -0800 Subject: [PATCH 07/14] correctly builds fromTS in block metadata Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/index.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index e3a14dc5453ea..58d43b8cd0aca 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -234,8 +234,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { Bounds: NewBounds(fromFp, throughFP), } - for _, x := range xs { - if x.FromTs < res.FromTs { + for i, x := range xs { + if i == 0 || x.FromTs < res.FromTs { res.FromTs = x.FromTs } if x.ThroughTs > res.ThroughTs { From 5b9fc6228d57cf67719058aa122d784e2173d839 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 11:10:30 -0800 Subject: [PATCH 08/14] always set blockLoadingIter underlying iter Signed-off-by: Owen Diehl --- pkg/bloomcompactor/batch.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/bloomcompactor/batch.go b/pkg/bloomcompactor/batch.go index bed0834a86b74..920bff1decc8f 100644 --- a/pkg/bloomcompactor/batch.go +++ b/pkg/bloomcompactor/batch.go @@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool { // check if there are more overlapping groups to load if !i.overlapping.Next() { i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]() - return false - } + if i.overlapping.Err() != nil { + i.err = i.overlapping.Err() + } - if i.overlapping.Err() != nil { - i.err = i.overlapping.Err() return false } @@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool { filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter) iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs)) - for filtered.Next() && filtered.Err() == nil { + for filtered.Next() { bq := loader.At() if _, ok := i.loaded[bq]; !ok { i.loaded[bq] = struct{}{} @@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool { iters = append(iters, iter) } - if loader.Err() != nil { - i.err = loader.Err() + if err := filtered.Err(); err != nil { + i.err = err + i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]() return false } From 7a2de0dd2448384707177154b32566af84396b21 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 11:10:52 -0800 Subject: [PATCH 09/14] check store iter err in mergebuilder Signed-off-by: Owen Diehl --- pkg/storage/bloom/v1/builder.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index d2d51b557e5d3..c016cf651174c 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -606,6 +606,10 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } + if err := mb.store.Err(); err != nil { + return 0, errors.Wrap(err, "iterating store") + } + checksum, err := builder.Close() if err != nil { return 0, errors.Wrap(err, "closing block") From 0f9920c941e7820ebeb366b2576845820e380a43 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 11:26:03 -0800 Subject: [PATCH 10/14] report keyspace progress during block uploading Signed-off-by: Owen Diehl --- pkg/bloomcompactor/controller.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index 75a9755fa3a83..b7d9f915a0eae 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -374,7 +374,15 @@ func (s *SimpleBloomController) buildGaps( return nil, errors.Wrap(err, "failed to write block") } s.metrics.blocksCreated.Inc() - level.Debug(logger).Log("msg", "uploaded block", "block", built.BlockRef.String()) + + totalGapKeyspace := (gap.bounds.Max - gap.bounds.Min) + progress := (built.Bounds.Max - gap.bounds.Min) + pct := float64(progress) / float64(totalGapKeyspace) * 100 + level.Debug(logger).Log( + "msg", "uploaded block", + "block", built.BlockRef.String(), + "progress_pct", fmt.Sprintf("%.2f", pct), + ) meta.Blocks = append(meta.Blocks, built.BlockRef) } @@ -388,6 +396,7 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() // Write the new meta + // TODO(owen-d): put total size in log, total time in metrics+log ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks) if err != nil { level.Error(logger).Log("msg", "failed to checksum meta", "err", err) From 2d09d201ed8a190ac886c2e0fe95d90a90e132d4 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 12:15:35 -0800 Subject: [PATCH 11/14] optimization: ignore fetching superset metas when unnecessary Signed-off-by: Owen Diehl --- pkg/bloomcompactor/controller.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index b7d9f915a0eae..cf509ed5a94b2 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -149,6 +149,7 @@ func (s *SimpleBloomController) compactTenant( return nil } +// fetchSuperSet fetches all metas which overlap the ownership range of the first set of metas we've resolved func (s *SimpleBloomController) fetchSuperSet( ctx context.Context, tenant string, @@ -179,12 +180,22 @@ func (s *SimpleBloomController) fetchSuperSet( superset = union[0] } + within := superset.Within(ownershipRange) level.Debug(logger).Log( "msg", "looking for superset metas", "superset", superset.String(), - "superset_within", superset.Within(ownershipRange), + "superset_within", within, ) + if within { + // we don't need to fetch any more metas + // NB(owen-d): here we copy metas into the output. This is slightly inefficient, but + // helps prevent mutability bugs by returning the same slice as the input. + results := make([]bloomshipper.Meta, len(metas)) + copy(results, metas) + return results, nil + } + supersetMetas, err := s.bloomStore.FetchMetas( ctx, bloomshipper.MetaSearchParams{ From 417a07d26c441fb812566b8102d480e5eb4835aa Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 13:13:12 -0800 Subject: [PATCH 12/14] chunks iterated vs skipped metrics Signed-off-by: Owen Diehl --- pkg/bloomcompactor/spec.go | 7 +++++-- pkg/storage/bloom/v1/builder.go | 11 +++++++++++ pkg/storage/bloom/v1/builder_test.go | 3 ++- pkg/storage/bloom/v1/metrics.go | 8 ++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 67d41b650e375..cb030dfb59131 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -138,7 +138,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds @@ -146,6 +146,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions + metrics *Metrics populate func(*v1.Series, *v1.Bloom) error readWriterFn func() (v1.BlockWriter, v1.BlockReader) series v1.PeekingIterator[*v1.Series] @@ -158,6 +159,7 @@ type LazyBlockBuilderIterator struct { func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, + metrics *Metrics, populate func(*v1.Series, *v1.Bloom) error, readWriterFn func() (v1.BlockWriter, v1.BlockReader), series v1.PeekingIterator[*v1.Series], @@ -166,6 +168,7 @@ func NewLazyBlockBuilderIterator( return &LazyBlockBuilderIterator{ ctx: ctx, opts: opts, + metrics: metrics, populate: populate, readWriterFn: readWriterFn, series: series, @@ -189,7 +192,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate) + mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) writer, reader := b.readWriterFn() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index c016cf651174c..b094b847f2ef5 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -526,6 +526,7 @@ type MergeBuilder struct { store Iterator[*Series] // Add chunks to a bloom populate func(*Series, *Bloom) error + metrics *Metrics } // NewMergeBuilder is a specific builder which does the following: @@ -536,11 +537,13 @@ func NewMergeBuilder( blocks Iterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error, + metrics *Metrics, ) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, populate: populate, + metrics: metrics, } } @@ -568,6 +571,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { nextInBlocks = deduped.At() } + var chunksIndexed, chunksCopied int + cur := nextInBlocks chunksToAdd := nextInStore.Chunks // The next series from the store doesn't exist in the blocks, so we add it @@ -583,8 +588,11 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } else { // if the series already exists in the block, we only need to add the new chunks chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks) + chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd) } + chunksIndexed = len(chunksToAdd) + if len(chunksToAdd) > 0 { if err := mb.populate( &Series{ @@ -597,6 +605,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed)) + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied)) + blockFull, err := builder.AddSeries(*cur) if err != nil { return 0, errors.Wrap(err, "adding series to block") diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 0122a35f7751c..0013ad8744579 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -226,7 +226,7 @@ func TestMergeBuilder(t *testing.T) { ) // Ensure that the merge builder combines all the blocks correctly - mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop) + mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop, NewMetrics(nil)) indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -400,6 +400,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // We're not actually indexing new data in this test return nil }, + NewMetrics(nil), ) builder, err := NewBlockBuilder(DefaultBlockOptions, writer) require.Nil(t, err) diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index aa604c29f1573..bcda8186db55f 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -10,8 +10,12 @@ type Metrics struct { bloomSize prometheus.Histogram // size of the bloom filter in bytes hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter + chunksIndexed *prometheus.CounterVec } +const chunkIndexedTypeIterated = "iterated" +const chunkIndexedTypeCopied = "copied" + func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{ @@ -33,5 +37,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Help: "Estimated number of elements in the bloom filter", Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10), }), + chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "bloom_chunks_indexed", + Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block", + }, []string{"type"}), } } From 1a51d8ede07da26aebc96cff9a3b14dff6513b96 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 13:28:11 -0800 Subject: [PATCH 13/14] [convention] counters end in _total Signed-off-by: Owen Diehl --- pkg/bloomcompactor/metrics.go | 16 ++++++++-------- pkg/storage/bloom/v1/metrics.go | 4 ++-- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index dc2b67da53199..7fe919a85624f 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -56,13 +56,13 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "compactions_started", + Name: "compactions_started_total", Help: "Total number of compactions started", }), compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "compactions_completed", + Name: "compactions_completed_total", Help: "Total number of compactions completed", }, []string{"status"}), compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ @@ -76,7 +76,7 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_discovered", + Name: "tenants_discovered_total", Help: "Number of tenants discovered during the current compaction run", }), tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{ @@ -88,19 +88,19 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_skipped", + Name: "tenants_skipped_total", Help: "Number of tenants skipped since they are not owned by this instance", }), tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_started", + Name: "tenants_started_total", Help: "Number of tenants started to process during the current compaction run", }), tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_completed", + Name: "tenants_completed_total", Help: "Number of tenants successfully processed during the current compaction run", }, []string{"status"}), tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ @@ -121,13 +121,13 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { blocksCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "blocks_created", + Name: "blocks_created_total", Help: "Number of blocks created", }), metasCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "metas_created", + Name: "metas_created_total", Help: "Number of metas created", }), } diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index bcda8186db55f..f5568a9d76596 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -19,7 +19,7 @@ const chunkIndexedTypeCopied = "copied" func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "bloom_creation_time", + Name: "bloom_creation_time_total", Help: "Time spent creating scalable bloom filters", }), bloomSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ @@ -38,7 +38,7 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10), }), chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "bloom_chunks_indexed", + Name: "bloom_chunks_indexed_total", Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block", }, []string{"type"}), } From c591438b56a27045945ae76accc2914934bc18b9 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 19 Feb 2024 08:51:28 -0800 Subject: [PATCH 14/14] pr feedback Signed-off-by: Owen Diehl --- pkg/bloomcompactor/controller.go | 14 +++++++++++++- pkg/bloomcompactor/metrics.go | 14 ++++++++++++++ pkg/bloomcompactor/tsdb.go | 1 - 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index cf509ed5a94b2..2a4ff6cd45242 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -94,7 +94,7 @@ func (s *SimpleBloomController) compactTenant( level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) - // fetch all metas overlapping all metas overlapping our ownership range so we can safely + // fetch all metas overlapping our ownership range so we can safely // check which metas can be deleted even if they only partially overlap out ownership range superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger) if err != nil { @@ -119,6 +119,16 @@ func (s *SimpleBloomController) compactTenant( outdated := outdatedMetas(combined) level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + + var ( + deletedMetas int + deletedBlocks int + ) + defer func() { + s.metrics.metasDeleted.Add(float64(deletedMetas)) + s.metrics.blocksDeleted.Add(float64(deletedBlocks)) + }() + for _, meta := range outdated { for _, block := range meta.Blocks { err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}) @@ -130,6 +140,7 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to delete block") } } + deletedBlocks++ level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) } @@ -142,6 +153,7 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to delete meta") } } + deletedMetas++ level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) } diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index 7fe919a85624f..74378cb786429 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -33,7 +33,9 @@ type Metrics struct { tenantsSeries prometheus.Histogram blocksCreated prometheus.Counter + blocksDeleted prometheus.Counter metasCreated prometheus.Counter + metasDeleted prometheus.Counter } func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { @@ -124,12 +126,24 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { Name: "blocks_created_total", Help: "Number of blocks created", }), + blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "blocks_deleted_total", + Help: "Number of blocks deleted", + }), metasCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "metas_created_total", Help: "Number of metas created", }), + metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "metas_deleted_total", + Help: "Number of metas deleted", + }), } return &m diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index c639d3db61474..7f5ec5eab81a3 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -236,7 +236,6 @@ func NewTSDBStores( if err != nil { return nil, errors.Wrap(err, "failed to create object client") } - // prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix) res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix)) } }