From 417a07d26c441fb812566b8102d480e5eb4835aa Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Sun, 18 Feb 2024 13:13:12 -0800 Subject: [PATCH] chunks iterated vs skipped metrics Signed-off-by: Owen Diehl --- pkg/bloomcompactor/spec.go | 7 +++++-- pkg/storage/bloom/v1/builder.go | 11 +++++++++++ pkg/storage/bloom/v1/builder_test.go | 3 ++- pkg/storage/bloom/v1/metrics.go | 8 ++++++++ 4 files changed, 26 insertions(+), 3 deletions(-) diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 67d41b650e375..cb030dfb59131 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -138,7 +138,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds @@ -146,6 +146,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions + metrics *Metrics populate func(*v1.Series, *v1.Bloom) error readWriterFn func() (v1.BlockWriter, v1.BlockReader) series v1.PeekingIterator[*v1.Series] @@ -158,6 +159,7 @@ type LazyBlockBuilderIterator struct { func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, + metrics *Metrics, populate func(*v1.Series, *v1.Bloom) error, readWriterFn func() (v1.BlockWriter, v1.BlockReader), series v1.PeekingIterator[*v1.Series], @@ -166,6 +168,7 @@ func NewLazyBlockBuilderIterator( return &LazyBlockBuilderIterator{ ctx: ctx, opts: opts, + metrics: metrics, populate: populate, readWriterFn: readWriterFn, series: series, @@ -189,7 +192,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate) + mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics) writer, reader := b.readWriterFn() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index c016cf651174c..b094b847f2ef5 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -526,6 +526,7 @@ type MergeBuilder struct { store Iterator[*Series] // Add chunks to a bloom populate func(*Series, *Bloom) error + metrics *Metrics } // NewMergeBuilder is a specific builder which does the following: @@ -536,11 +537,13 @@ func NewMergeBuilder( blocks Iterator[*SeriesWithBloom], store Iterator[*Series], populate func(*Series, *Bloom) error, + metrics *Metrics, ) *MergeBuilder { return &MergeBuilder{ blocks: blocks, store: store, populate: populate, + metrics: metrics, } } @@ -568,6 +571,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { nextInBlocks = deduped.At() } + var chunksIndexed, chunksCopied int + cur := nextInBlocks chunksToAdd := nextInStore.Chunks // The next series from the store doesn't exist in the blocks, so we add it @@ -583,8 +588,11 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } else { // if the series already exists in the block, we only need to add the new chunks chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks) + chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd) } + chunksIndexed = len(chunksToAdd) + if len(chunksToAdd) > 0 { if err := mb.populate( &Series{ @@ -597,6 +605,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) { } } + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed)) + mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied)) + blockFull, err := builder.AddSeries(*cur) if err != nil { return 0, errors.Wrap(err, "adding series to block") diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 0122a35f7751c..0013ad8744579 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -226,7 +226,7 @@ func TestMergeBuilder(t *testing.T) { ) // Ensure that the merge builder combines all the blocks correctly - mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop) + mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop, NewMetrics(nil)) indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -400,6 +400,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // We're not actually indexing new data in this test return nil }, + NewMetrics(nil), ) builder, err := NewBlockBuilder(DefaultBlockOptions, writer) require.Nil(t, err) diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index aa604c29f1573..bcda8186db55f 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -10,8 +10,12 @@ type Metrics struct { bloomSize prometheus.Histogram // size of the bloom filter in bytes hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter + chunksIndexed *prometheus.CounterVec } +const chunkIndexedTypeIterated = "iterated" +const chunkIndexedTypeCopied = "copied" + func NewMetrics(r prometheus.Registerer) *Metrics { return &Metrics{ sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{ @@ -33,5 +37,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Help: "Estimated number of elements in the bloom filter", Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10), }), + chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "bloom_chunks_indexed", + Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block", + }, []string{"type"}), } }