diff --git a/pkg/bloomcompactor/batch.go b/pkg/bloomcompactor/batch.go index bed0834a86b74..920bff1decc8f 100644 --- a/pkg/bloomcompactor/batch.go +++ b/pkg/bloomcompactor/batch.go @@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool { // check if there are more overlapping groups to load if !i.overlapping.Next() { i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]() - return false - } + if i.overlapping.Err() != nil { + i.err = i.overlapping.Err() + } - if i.overlapping.Err() != nil { - i.err = i.overlapping.Err() return false } @@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool { filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter) iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs)) - for filtered.Next() && filtered.Err() == nil { + for filtered.Next() { bq := loader.At() if _, ok := i.loaded[bq]; !ok { i.loaded[bq] = struct{}{} @@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool { iters = append(iters, iter) } - if loader.Err() != nil { - i.err = loader.Err() + if err := filtered.Err(); err != nil { + i.err = err + i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]() return false } diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 3bb1c815e8295..cc96cc7219e8d 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error // runs a single round of compaction for all relevant tenants and tables func (c *Compactor) runOne(ctx context.Context) error { + level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism) var workersErr error var wg sync.WaitGroup ch := make(chan tenantTable) @@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error { err := c.loadWork(ctx, ch) wg.Wait() - return multierror.New(workersErr, err, ctx.Err()).Err() + err = multierror.New(workersErr, err, ctx.Err()).Err() + if err != nil { + level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err) + } + return err } func (c *Compactor) tables(ts time.Time) *dayRangeIterator { @@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator { fromDay := config.NewDayTime(model.TimeFromUnixNano(from)) throughDay := config.NewDayTime(model.TimeFromUnixNano(through)) + level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay) return newDayRangeIterator(fromDay, throughDay, c.schemaCfg) } @@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { for tables.Next() && tables.Err() == nil && ctx.Err() == nil { table := tables.At() + level.Debug(c.logger).Log("msg", "loading work for table", "table", table) + tenants, err := c.tenants(ctx, table) if err != nil { return errors.Wrap(err, "getting tenants") @@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { if err != nil { return errors.Wrap(err, "checking tenant ownership") } + level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns) if !owns { c.metrics.tenantsSkipped.Inc() continue @@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { } if err := tenants.Err(); err != nil { + level.Error(c.logger).Log("msg", "error iterating tenants", "err", err) return errors.Wrap(err, "iterating tenants") } } if err := tables.Err(); err != nil { + level.Error(c.logger).Log("msg", "error iterating tables", "err", err) return errors.Wrap(err, "iterating tables") } @@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error } func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error { - level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange) + level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String()) return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange) } diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index ef41ec2d8efbb..2a4ff6cd45242 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -70,7 +70,7 @@ func (s *SimpleBloomController) compactTenant( tenant string, ownershipRange v1.FingerprintBounds, ) error { - logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr()) + logger := log.With(s.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String()) client, err := s.bloomStore.Client(table.ModelTime()) if err != nil { @@ -92,6 +92,15 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to get metas") } + level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas)) + + // fetch all metas overlapping our ownership range so we can safely + // check which metas can be deleted even if they only partially overlap out ownership range + superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger) + if err != nil { + return errors.Wrap(err, "failed to fetch superset") + } + // build compaction plans work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger) if err != nil { @@ -104,6 +113,63 @@ func (s *SimpleBloomController) compactTenant( return errors.Wrap(err, "failed to build gaps") } + // combine built and superset metas + // in preparation for removing outdated ones + combined := append(superset, built...) + + outdated := outdatedMetas(combined) + level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated)) + + var ( + deletedMetas int + deletedBlocks int + ) + defer func() { + s.metrics.metasDeleted.Add(float64(deletedMetas)) + s.metrics.blocksDeleted.Add(float64(deletedBlocks)) + }() + + for _, meta := range outdated { + for _, block := range meta.Blocks { + err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}) + if err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String()) + } else { + level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String()) + return errors.Wrap(err, "failed to delete block") + } + } + deletedBlocks++ + level.Debug(logger).Log("msg", "removed outdated block", "block", block.String()) + } + + err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}) + if err != nil { + if client.IsObjectNotFoundErr(err) { + level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String()) + } else { + level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String()) + return errors.Wrap(err, "failed to delete meta") + } + } + deletedMetas++ + level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String()) + } + + level.Debug(logger).Log("msg", "finished compaction") + return nil +} + +// fetchSuperSet fetches all metas which overlap the ownership range of the first set of metas we've resolved +func (s *SimpleBloomController) fetchSuperSet( + ctx context.Context, + tenant string, + table config.DayTable, + ownershipRange v1.FingerprintBounds, + metas []bloomshipper.Meta, + logger log.Logger, +) ([]bloomshipper.Meta, error) { // in order to delete outdates metas which only partially fall within the ownership range, // we need to fetcha all metas in the entire bound range of the first set of metas we've resolved /* @@ -121,12 +187,28 @@ func (s *SimpleBloomController) compactTenant( union := superset.Union(meta.Bounds) if len(union) > 1 { level.Error(logger).Log("msg", "meta bounds union is not a single range", "union", union) - return errors.New("meta bounds union is not a single range") + return nil, errors.New("meta bounds union is not a single range") } superset = union[0] } - metas, err = s.bloomStore.FetchMetas( + within := superset.Within(ownershipRange) + level.Debug(logger).Log( + "msg", "looking for superset metas", + "superset", superset.String(), + "superset_within", within, + ) + + if within { + // we don't need to fetch any more metas + // NB(owen-d): here we copy metas into the output. This is slightly inefficient, but + // helps prevent mutability bugs by returning the same slice as the input. + results := make([]bloomshipper.Meta, len(metas)) + copy(results, metas) + return results, nil + } + + supersetMetas, err := s.bloomStore.FetchMetas( ctx, bloomshipper.MetaSearchParams{ TenantID: tenant, @@ -134,42 +216,20 @@ func (s *SimpleBloomController) compactTenant( Keyspace: superset, }, ) + if err != nil { level.Error(logger).Log("msg", "failed to get meta superset range", "err", err, "superset", superset) - return errors.Wrap(err, "failed to get meta supseret range") + return nil, errors.Wrap(err, "failed to get meta supseret range") } - // combine built and pre-existing metas - // in preparation for removing outdated metas - metas = append(metas, built...) - - outdated := outdatedMetas(metas) - for _, meta := range outdated { - for _, block := range meta.Blocks { - if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil { - if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block) - continue - } - - level.Error(logger).Log("msg", "failed to delete blocks", "err", err) - return errors.Wrap(err, "failed to delete blocks") - } - } - - if err := client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}); err != nil { - if client.IsObjectNotFoundErr(err) { - level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef) - } else { - level.Error(logger).Log("msg", "failed to delete metas", "err", err) - return errors.Wrap(err, "failed to delete metas") - } - } - } - - level.Debug(logger).Log("msg", "finished compaction") - return nil + level.Debug(logger).Log( + "msg", "found superset metas", + "metas", len(metas), + "fresh_metas", len(supersetMetas), + "delta", len(supersetMetas)-len(metas), + ) + return supersetMetas, nil } func (s *SimpleBloomController) findOutdatedGaps( @@ -271,6 +331,7 @@ func (s *SimpleBloomController) buildGaps( for i := range plan.gaps { gap := plan.gaps[i] + logger := log.With(logger, "gap", gap.bounds.String(), "tsdb", plan.tsdb.Name()) meta := bloomshipper.Meta{ MetaRef: bloomshipper.MetaRef{ @@ -304,9 +365,11 @@ func (s *SimpleBloomController) buildGaps( blocksIter, s.rwFn, s.metrics, - log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap), + logger, ) + level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.blocks)) + newBlocks := gen.Generate(ctx) if err != nil { level.Error(logger).Log("msg", "failed to generate bloom", "err", err) @@ -333,6 +396,16 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() return nil, errors.Wrap(err, "failed to write block") } + s.metrics.blocksCreated.Inc() + + totalGapKeyspace := (gap.bounds.Max - gap.bounds.Min) + progress := (built.Bounds.Max - gap.bounds.Min) + pct := float64(progress) / float64(totalGapKeyspace) * 100 + level.Debug(logger).Log( + "msg", "uploaded block", + "block", built.BlockRef.String(), + "progress_pct", fmt.Sprintf("%.2f", pct), + ) meta.Blocks = append(meta.Blocks, built.BlockRef) } @@ -346,6 +419,7 @@ func (s *SimpleBloomController) buildGaps( blocksIter.Close() // Write the new meta + // TODO(owen-d): put total size in log, total time in metrics+log ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks) if err != nil { level.Error(logger).Log("msg", "failed to checksum meta", "err", err) @@ -357,8 +431,10 @@ func (s *SimpleBloomController) buildGaps( level.Error(logger).Log("msg", "failed to write meta", "err", err) return nil, errors.Wrap(err, "failed to write meta") } - created = append(created, meta) + s.metrics.metasCreated.Inc() + level.Debug(logger).Log("msg", "uploaded meta", "meta", meta.MetaRef.String()) + created = append(created, meta) totalSeries += uint64(seriesItrWithCounter.Count()) } } diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index 350e3ed7e480e..74378cb786429 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -31,6 +31,11 @@ type Metrics struct { tenantsCompleted *prometheus.CounterVec tenantsCompletedTime *prometheus.HistogramVec tenantsSeries prometheus.Histogram + + blocksCreated prometheus.Counter + blocksDeleted prometheus.Counter + metasCreated prometheus.Counter + metasDeleted prometheus.Counter } func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { @@ -53,13 +58,13 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "compactions_started", + Name: "compactions_started_total", Help: "Total number of compactions started", }), compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "compactions_completed", + Name: "compactions_completed_total", Help: "Total number of compactions completed", }, []string{"status"}), compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ @@ -73,7 +78,7 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_discovered", + Name: "tenants_discovered_total", Help: "Number of tenants discovered during the current compaction run", }), tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{ @@ -85,19 +90,19 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_skipped", + Name: "tenants_skipped_total", Help: "Number of tenants skipped since they are not owned by this instance", }), tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_started", + Name: "tenants_started_total", Help: "Number of tenants started to process during the current compaction run", }), tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "tenants_completed", + Name: "tenants_completed_total", Help: "Number of tenants successfully processed during the current compaction run", }, []string{"status"}), tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ @@ -115,6 +120,30 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics { // Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10), }), + blocksCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "blocks_created_total", + Help: "Number of blocks created", + }), + blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "blocks_deleted_total", + Help: "Number of blocks deleted", + }), + metasCreated: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "metas_created_total", + Help: "Number of metas created", + }), + metasDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "metas_deleted_total", + Help: "Number of metas deleted", + }), } return &m diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 67d41b650e375..cb030dfb59131 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -138,7 +138,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds @@ -146,6 +146,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions + metrics *Metrics populate func(*v1.Series, *v1.Bloom) error readWriterFn func() (v1.BlockWriter, v1.BlockReader) series v1.PeekingIterator[*v1.Series] @@ -158,6 +159,7 @@ type LazyBlockBuilderIterator struct { func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, + metrics *Metrics, populate func(*v1.Series, *v1.Bloom) error, readWriterFn func() (v1.BlockWriter, v1.BlockReader), series v1.PeekingIterator[*v1.Series], @@ -166,6 +168,7 @@ func NewLazyBlockBuilderIterator( return &LazyBlockBuilderIterator{ ctx: ctx, opts: opts, + metrics: metrics, populate: populate, readWriterFn: readWriterFn, series: series, @@ -189,7 +192,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate) + mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) writer, reader := b.readWriterFn() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index 6159ce02a804a..7f5ec5eab81a3 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -236,8 +236,7 @@ func NewTSDBStores( if err != nil { return nil, errors.Wrap(err, "failed to create object client") } - prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix) - res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, prefix)) + res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix)) } } diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index 6bc43cf794342..e9776dfef78f5 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -323,8 +323,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, MetaRef: bloomshipper.MetaRef{ Ref: ref, }, - BlockTombstones: []bloomshipper.BlockRef{}, - Blocks: []bloomshipper.BlockRef{blockRef}, + Blocks: []bloomshipper.BlockRef{blockRef}, } block, data, _ := v1.MakeBlock(t, n, fromFp, throughFp, from, through) // Printing fingerprints and the log lines of its chunks comes handy for debugging... diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index d2d51b557e5d3..b094b847f2ef5 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -526,6 +526,7 @@ type MergeBuilder struct { store Iterator[*Series] // Add chunks to a bloom populate func(*Series, *Bloom) error + metrics *Metrics } // NewMergeBuilder is a specific builder which does the following: @@ -536,11 +537,13 @@ func NewMergeBuilder( blocks Iterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error, + metrics *Metrics, ) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, populate: populate, + metrics: metrics, } } @@ -568,6 +571,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { nextInBlocks = deduped.At() } + var chunksIndexed, chunksCopied int + cur := nextInBlocks chunksToAdd := nextInStore.Chunks // The next series from the store doesn't exist in the blocks, so we add it @@ -583,8 +588,11 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } else { // if the series already exists in the block, we only need to add the new chunks chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks) + chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd) } + chunksIndexed = len(chunksToAdd) + if len(chunksToAdd) > 0 { if err := mb.populate( &Series{ @@ -597,6 +605,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed)) + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied)) + blockFull, err := builder.AddSeries(*cur) if err != nil { return 0, errors.Wrap(err, "adding series to block") @@ -606,6 +617,10 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } + if err := mb.store.Err(); err != nil { + return 0, errors.Wrap(err, "iterating store") + } + checksum, err := builder.Close() if err != nil { return 0, errors.Wrap(err, "closing block") diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 0122a35f7751c..0013ad8744579 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -226,7 +226,7 @@ func TestMergeBuilder(t *testing.T) { ) // Ensure that the merge builder combines all the blocks correctly - mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop) + mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop, NewMetrics(nil)) indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -400,6 +400,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // We're not actually indexing new data in this test return nil }, + NewMetrics(nil), ) builder, err := NewBlockBuilder(DefaultBlockOptions, writer) require.Nil(t, err) diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index e3a14dc5453ea..58d43b8cd0aca 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -234,8 +234,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { Bounds: NewBounds(fromFp, throughFP), } - for _, x := range xs { - if x.FromTs < res.FromTs { + for i, x := range xs { + if i == 0 || x.FromTs < res.FromTs { res.FromTs = x.FromTs } if x.ThroughTs > res.ThroughTs { diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index aa604c29f1573..f5568a9d76596 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -10,12 +10,16 @@ type Metrics struct { bloomSize prometheus.Histogram // size of the bloom filter in bytes hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter + chunksIndexed *prometheus.CounterVec } +const chunkIndexedTypeIterated = "iterated" +const chunkIndexedTypeCopied = "copied" + func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{ - Name: "bloom_creation_time", + Name: "bloom_creation_time_total", Help: "Time spent creating scalable bloom filters", }), bloomSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ @@ -33,5 +37,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Help: "Estimated number of elements in the bloom filter", Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10), }), + chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "bloom_chunks_indexed_total", + Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block", + }, []string{"type"}), } } diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 882b0eab41c24..240f2b5166588 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -88,10 +88,6 @@ type Meta struct { // The specific TSDB files used to generate the block. Sources []tsdb.SingleTenantTSDBIdentifier - // TODO(owen-d): remove, unused - // Old blocks which can be deleted in the future. These should be from previous compaction rounds. - BlockTombstones []BlockRef - // A list of blocks that were generated Blocks []BlockRef } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index 897ed519946a7..e5bbe3b5b1bf5 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -63,8 +63,7 @@ func putMeta(c *BloomClient, tenant string, start model.Time, minFp, maxFp model // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } raw, _ := json.Marshal(meta) return meta, c.client.PutObject(context.Background(), c.Meta(meta.MetaRef).Addr(), bytes.NewReader(raw)) @@ -129,8 +128,7 @@ func TestBloomClient_PutMeta(t *testing.T) { // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } err := c.PutMeta(ctx, meta) diff --git a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go index 40a695e0b8e6c..962bebb9956fd 100644 --- a/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/fetcher_test.go @@ -34,8 +34,7 @@ func makeMetas(t *testing.T, schemaCfg config.SchemaConfig, ts model.Time, keysp EndTimestamp: ts, }, }, - BlockTombstones: []BlockRef{}, - Blocks: []BlockRef{}, + Blocks: []BlockRef{}, } } return metas diff --git a/pkg/storage/stores/shipper/bloomshipper/resolver.go b/pkg/storage/stores/shipper/bloomshipper/resolver.go index 40a59cee42dbc..7d224b9f01392 100644 --- a/pkg/storage/stores/shipper/bloomshipper/resolver.go +++ b/pkg/storage/stores/shipper/bloomshipper/resolver.go @@ -14,6 +14,9 @@ const ( BloomPrefix = "bloom" MetasPrefix = "metas" BlocksPrefix = "blocks" + + extTarGz = ".tar.gz" + extJSON = ".json" ) // KeyResolver is an interface for resolving keys to locations. @@ -36,7 +39,7 @@ func (defaultKeyResolver) Meta(ref MetaRef) Location { fmt.Sprintf("%v", ref.TableName), ref.TenantID, MetasPrefix, - fmt.Sprintf("%v-%v", ref.Bounds, ref.Checksum), + fmt.Sprintf("%v-%x%s", ref.Bounds, ref.Checksum, extJSON), } } @@ -50,7 +53,8 @@ func (defaultKeyResolver) ParseMetaKey(loc Location) (MetaRef, error) { if err != nil { return MetaRef{}, fmt.Errorf("failed to parse bounds of meta key %s : %w", loc, err) } - checksum, err := strconv.ParseUint(fnParts[2], 16, 64) + withoutExt := strings.TrimSuffix(fnParts[2], extJSON) + checksum, err := strconv.ParseUint(withoutExt, 16, 64) if err != nil { return MetaRef{}, fmt.Errorf("failed to parse checksum of meta key %s : %w", loc, err) } @@ -77,7 +81,7 @@ func (defaultKeyResolver) Block(ref BlockRef) Location { ref.TenantID, BlocksPrefix, ref.Bounds.String(), - fmt.Sprintf("%d-%d-%x", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum), + fmt.Sprintf("%d-%d-%x%s", ref.StartTimestamp, ref.EndTimestamp, ref.Checksum, extTarGz), } } diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index fd755b0a204a7..3267886ac063e 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -55,30 +55,15 @@ func (s *Shipper) Stop() { } // BlocksForMetas returns all the blocks from all the metas listed that are within the requested bounds -// and not tombstoned in any of the metas -func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) []BlockRef { - blocks := make(map[BlockRef]bool) // block -> isTombstoned - +func BlocksForMetas(metas []Meta, interval Interval, keyspaces []v1.FingerprintBounds) (refs []BlockRef) { for _, meta := range metas { - for _, tombstone := range meta.BlockTombstones { - blocks[tombstone] = true - } for _, block := range meta.Blocks { - tombstoned, ok := blocks[block] - if ok && tombstoned { - // skip tombstoned blocks - continue + if !isOutsideRange(block, interval, keyspaces) { + refs = append(refs, block) } - blocks[block] = false } } - refs := make([]BlockRef, 0, len(blocks)) - for ref, tombstoned := range blocks { - if !tombstoned && !isOutsideRange(ref, interval, keyspaces) { - refs = append(refs, ref) - } - } sort.Slice(refs, func(i, j int) bool { return refs[i].Bounds.Less(refs[j].Bounds) }) diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go index c9e47f91fea28..e03d72c26ba37 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper_test.go @@ -14,49 +14,6 @@ import ( ) func TestBloomShipper_findBlocks(t *testing.T) { - t.Run("expected block that are specified in tombstones to be filtered out", func(t *testing.T) { - metas := []Meta{ - { - Blocks: []BlockRef{ - //this blockRef is marked as deleted in the next meta - createMatchingBlockRef(1), - createMatchingBlockRef(2), - }, - }, - { - Blocks: []BlockRef{ - //this blockRef is marked as deleted in the next meta - createMatchingBlockRef(3), - createMatchingBlockRef(4), - }, - }, - { - BlockTombstones: []BlockRef{ - createMatchingBlockRef(1), - createMatchingBlockRef(3), - }, - Blocks: []BlockRef{ - createMatchingBlockRef(5), - }, - }, - } - - ts := model.Now() - - interval := NewInterval( - ts.Add(-2*time.Hour), - ts.Add(-1*time.Hour), - ) - blocks := BlocksForMetas(metas, interval, []v1.FingerprintBounds{{Min: 100, Max: 200}}) - - expectedBlockRefs := []BlockRef{ - createMatchingBlockRef(2), - createMatchingBlockRef(4), - createMatchingBlockRef(5), - } - require.ElementsMatch(t, expectedBlockRefs, blocks) - }) - tests := map[string]struct { minFingerprint uint64 maxFingerprint uint64 diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index ca86cb94fa963..c99aa46df4bf3 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -83,8 +83,7 @@ func createMetaInStorage(store *BloomStore, tenant string, start model.Time, min // EndTimestamp: start.Add(12 * time.Hour), }, }, - Blocks: []BlockRef{}, - BlockTombstones: []BlockRef{}, + Blocks: []BlockRef{}, } err := store.storeDo(start, func(s *bloomStoreEntry) error { raw, _ := json.Marshal(meta) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 9627718aa8ec7..00ee2e152144a 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -339,7 +339,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.") f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.") _ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize) - f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.") + f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", + fmt.Sprintf( + "The maximum bloom block size. A value of 0 sets an unlimited size. Default is %s. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.", + defaultBloomCompactorMaxBlockSize, + ), + ) l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)