Skip to content

Commit

Permalink
metrics for block series iterated + safer block iteration
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
  • Loading branch information
owen-d committed Feb 21, 2024
1 parent 1662298 commit 077b0af
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 17 deletions.
27 changes: 15 additions & 12 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,10 +549,16 @@ func NewMergeBuilder(

func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
var (
nextInBlocks *SeriesWithBloom
nextInBlocks *SeriesWithBloom
blocksFinished bool
blockSeriesIterated, chunksIndexed, chunksCopied int
)

deduped := mb.blocks
defer func() {
mb.metrics.blockSeriesIterated.Add(float64(blockSeriesIterated))
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed))
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied))
}()

for mb.store.Next() {
nextInStore := mb.store.At()
Expand All @@ -562,17 +568,17 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
// TODO(owen-d): expensive, but Seek is not implemented for this itr.
// It's also more efficient to build an iterator over the Series file in the index
// without the blooms until we find a bloom we actually need to unpack from the blooms file.
for nextInBlocks == nil || nextInBlocks.Series.Fingerprint < mb.store.At().Fingerprint {
if !deduped.Next() {
for !blocksFinished && (nextInBlocks == nil || nextInBlocks.Series.Fingerprint < mb.store.At().Fingerprint) {
if !mb.blocks.Next() {
// we've exhausted all the blocks
blocksFinished = true
nextInBlocks = nil
break
}
nextInBlocks = deduped.At()
blockSeriesIterated++
nextInBlocks = mb.blocks.At()
}

var chunksIndexed, chunksCopied int

cur := nextInBlocks
chunksToAdd := nextInStore.Chunks
// The next series from the store doesn't exist in the blocks, so we add it
Expand All @@ -588,10 +594,10 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
} else {
// if the series already exists in the block, we only need to add the new chunks
chunksToAdd = nextInStore.Chunks.Unless(nextInBlocks.Series.Chunks)
chunksCopied = len(nextInStore.Chunks) - len(chunksToAdd)
chunksCopied += len(nextInStore.Chunks) - len(chunksToAdd)
}

chunksIndexed = len(chunksToAdd)
chunksIndexed += len(chunksToAdd)

if len(chunksToAdd) > 0 {
if err := mb.populate(
Expand All @@ -605,9 +611,6 @@ func (mb *MergeBuilder) Build(builder *BlockBuilder) (uint32, error) {
}
}

mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeIterated).Add(float64(chunksIndexed))
mb.metrics.chunksIndexed.WithLabelValues(chunkIndexedTypeCopied).Add(float64(chunksCopied))

blockFull, err := builder.AddSeries(*cur)
if err != nil {
return 0, errors.Wrap(err, "adding series to block")
Expand Down
15 changes: 10 additions & 5 deletions pkg/storage/bloom/v1/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (
)

type Metrics struct {
sbfCreationTime prometheus.Counter // time spent creating sbfs
bloomSize prometheus.Histogram // size of the bloom filter in bytes
hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter
estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter
chunksIndexed *prometheus.CounterVec
sbfCreationTime prometheus.Counter // time spent creating sbfs
bloomSize prometheus.Histogram // size of the bloom filter in bytes
hammingWeightRatio prometheus.Histogram // ratio of the hamming weight of the bloom filter to the number of bits in the bloom filter
estimatedCount prometheus.Histogram // estimated number of elements in the bloom filter
chunksIndexed *prometheus.CounterVec
blockSeriesIterated prometheus.Counter
}

const chunkIndexedTypeIterated = "iterated"
Expand Down Expand Up @@ -41,5 +42,9 @@ func NewMetrics(r prometheus.Registerer) *Metrics {
Name: "bloom_chunks_indexed_total",
Help: "Number of chunks indexed in bloom filters, partitioned by type. Type can be iterated or copied, where iterated indicates the chunk data was fetched and ngrams for it's contents generated whereas copied indicates the chunk already existed in another source block and was copied to the new block",
}, []string{"type"}),
blockSeriesIterated: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "bloom_block_series_iterated_total",
Help: "Number of series iterated in existing blocks while generating new blocks",
}),
}
}

0 comments on commit 077b0af

Please sign in to comment.