Skip to content

Commit

Permalink
chunks iterated vs skipped metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 18, 2024
1 parent 2d09d20 commit 417a07d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
7 changes: 5 additions & 2 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,15 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) v1.Iterator[*v1.Blo
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.readWriterFn, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
// each block by adding series to them until they are full.
type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *Metrics
populate func(*v1.Series, *v1.Bloom) error
readWriterFn func() (v1.BlockWriter, v1.BlockReader)
series v1.PeekingIterator[*v1.Series]
Expand All @@ -158,6 +159,7 @@ type LazyBlockBuilderIterator struct {
func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *Metrics,
populate func(*v1.Series, *v1.Bloom) error,
readWriterFn func() (v1.BlockWriter, v1.BlockReader),
series v1.PeekingIterator[*v1.Series],
Expand All @@ -166,6 +168,7 @@ func NewLazyBlockBuilderIterator(
return &LazyBlockBuilderIterator{
ctx: ctx,
opts: opts,
metrics: metrics,
populate: populate,
readWriterFn: readWriterFn,
series: series,
Expand All @@ -189,7 +192,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics.bloomMetrics)
writer, reader := b.readWriterFn()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ type MergeBuilder struct {
store Iterator[*Series]
// Add chunks to a bloom
populate func(*Series, *Bloom) error
metrics *Metrics
}

// NewMergeBuilder is a specific builder which does the following:
Expand All @@ -536,11 +537,13 @@ func NewMergeBuilder(
blocks Iterator[*SeriesWithBloom],
store Iterator[*Series],
populate func(*Series, *Bloom) error,
metrics *Metrics,
) *MergeBuilder {
return &MergeBuilder{
blocks: blocks,
store: store,
populate: populate,
metrics: metrics,
}
}

Expand Down Expand Up @@ -568,6 +571,8 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
nextInBlocks = deduped.At()
}

var chunksIndexed, chunksCopied int

cur := nextInBlocks
chunksToAdd := nextInStore.Chunks
// The next series from the store doesn't exist in the blocks, so we add it
Expand All @@ -583,8 +588,11 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
} else {
// if the series already exists in the block, we only need to add the new chunks
chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks)
chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd)
}

chunksIndexed = len(chunksToAdd)

if len(chunksToAdd) > 0 {
if err := mb.populate(
&Series{
Expand All @@ -597,6 +605,9 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}

mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed))
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied))

blockFull, err := builder.AddSeries(*cur)
if err != nil {
return 0, errors.Wrap(err, "adding series to block")
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func TestMergeBuilder(t *testing.T) {
)

// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop)
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, pop, NewMetrics(nil))
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
Expand Down Expand Up @@ -400,6 +400,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
// We're not actually indexing new data in this test
return nil
},
NewMetrics(nil),
)
builder, err := NewBlockBuilder(DefaultBlockOptions, writer)
require.Nil(t, err)
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/bloom/v1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ type Metrics struct {
bloomSize prometheus.Histogram // size of the bloom filter in bytes
hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter
estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter
chunksIndexed *prometheus.CounterVec
}

const chunkIndexedTypeIterated = "iterated"
const chunkIndexedTypeCopied = "copied"

func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
sbfCreationTime: promauto.With(r).NewCounter(prometheus.CounterOpts{
Expand All @@ -33,5 +37,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Help: "Estimated number of elements in the bloom filter",
Buckets: prometheus.ExponentialBucketsRange(1, 33554432, 10),
}),
chunksIndexed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "bloom_chunks_indexed",
Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block",
}, []string{"type"}),
}
}

0 comments on commit 417a07d

Please sign in to comment.