Skip to content

Commit

Permalink
feat: Do not add empty blooms to offsets (#14577)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Oct 25, 2024
1 parent 5824e3d commit 51c42e8
Show file tree
Hide file tree
Showing 13 changed files with 260 additions and 32 deletions.
47 changes: 30 additions & 17 deletions integration/bloom_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ func TestBloomBuilding(t *testing.T) {
cliIngester.Now = now

// We now ingest some logs across many series.
series := make([]labels.Labels, 0, nSeries)
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-%d", i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
series := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job")

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
Expand Down Expand Up @@ -124,14 +116,8 @@ func TestBloomBuilding(t *testing.T) {
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)

// Push some more logs so TSDBs need to be updated.
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-new-%d", i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
newSeries := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job-new")
series = append(series, newSeries...)

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
Expand All @@ -147,6 +133,33 @@ func TestBloomBuilding(t *testing.T) {
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)
}

func writeSeries(t *testing.T, nSeries int, nLogsPerSeries int, cliDistributor *client.Client, now time.Time, seriesPrefix string) []labels.Labels {
series := make([]labels.Labels, 0, nSeries)
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("%s-%d", seriesPrefix, i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
// Only write wtructured metadata for half of the series
var metadata map[string]string
if i%2 == 0 {
metadata = map[string]string{
"traceID": fmt.Sprintf("%d%d", i, j),
"user": fmt.Sprintf("%d%d", i, j%10),
}
}

require.NoError(t, cliDistributor.PushLogLine(
fmt.Sprintf("log line %d", j),
now,
metadata,
lbs.Map(),
))
}
}
return series
}

func checkCompactionFinished(t *testing.T, cliCompactor *client.Client) {
checkForTimestampMetric(t, cliCompactor, "loki_boltdb_shipper_compact_tables_operation_last_successful_run_timestamp_seconds")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.logger, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
Expand All @@ -146,6 +146,7 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
logger log.Logger
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
Expand All @@ -160,6 +161,7 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *v1.Metrics,
logger log.Logger,
populate v1.BloomPopulatorFunc,
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
Expand All @@ -169,6 +171,7 @@ func NewLazyBlockBuilderIterator(
ctx: ctx,
opts: opts,
metrics: metrics,
logger: logger,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
Expand Down Expand Up @@ -196,7 +199,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics, b.logger)
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/bloom_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
}
}

func (b *BloomBlockBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.writeSchema(); err != nil {
Expand Down Expand Up @@ -68,6 +72,14 @@ func (b *BloomBlockBuilder) writeSchema() error {
}

func (b *BloomBlockBuilder) Close() (uint32, error) {
if !b.writtenSchema {
// We will get here only if we haven't appended any bloom filters to the block
// This would happen only if all series yielded empty blooms
if err := b.writeSchema(); err != nil {
return 0, errors.Wrap(err, "writing schema")
}
}

if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return 0, errors.Wrap(err, "flushing final bloom page")
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"hash"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/compression"
Expand Down Expand Up @@ -112,6 +114,10 @@ func (w *PageWriter) Reset() {
w.n = 0
}

func (w *PageWriter) UnflushedSize() int {
return w.enc.Len()
}

func (w *PageWriter) SpaceFor(numBytes int) bool {
// if a single bloom exceeds the target size, still accept it
// otherwise only accept it if adding it would not exceed the target size
Expand Down Expand Up @@ -189,6 +195,7 @@ type MergeBuilder struct {
// Add chunks of a single series to a bloom
populate BloomPopulatorFunc
metrics *Metrics
logger log.Logger
}

type BloomPopulatorFunc func(series *Series, preExistingBlooms iter.SizedIterator[*Bloom], chunksToAdd ChunkRefs, ch chan *BloomCreation)
Expand All @@ -202,6 +209,7 @@ func NewMergeBuilder(
store iter.Iterator[*Series],
populate BloomPopulatorFunc,
metrics *Metrics,
logger log.Logger,
) *MergeBuilder {
// combinedSeriesIter handles series with fingerprint collisions:
// because blooms dont contain the label-set (only the fingerprint),
Expand Down Expand Up @@ -229,6 +237,7 @@ func NewMergeBuilder(
store: combinedSeriesIter,
populate: populate,
metrics: metrics,
logger: logger,
}
}

Expand Down Expand Up @@ -306,6 +315,12 @@ func (mb *MergeBuilder) processNextSeries(
if creation.Err != nil {
return nil, info.sourceBytes, 0, false, false, errors.Wrap(creation.Err, "populating bloom")
}

if creation.Bloom.IsEmpty() {
level.Debug(mb.logger).Log("msg", "received empty bloom. Adding to index but skipping offsets", "fingerprint", nextInStore.Fingerprint)
continue
}

offset, err := builder.AddBloom(creation.Bloom)
if err != nil {
return nil, info.sourceBytes, 0, false, false, errors.Wrapf(
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -263,7 +264,7 @@ func TestMergeBuilder(t *testing.T) {
)

// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil))
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil), log.NewNopLogger())
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
Expand Down Expand Up @@ -350,6 +351,8 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
// We're not testing the ability to extend a bloom in this test
pop := func(_ *Series, _ iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) {
bloom := NewBloom()
// Add something to the bloom so it's not empty
bloom.Add([]byte("hello"))
stats := indexingInfo{
sourceBytes: int(bloom.Capacity()) / 8,
indexedFields: NewSetFromLiteral[Field]("__all__"),
Expand All @@ -367,6 +370,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
iter.NewSliceIter(data),
pop,
NewMetrics(nil),
log.NewNopLogger(),
)

_, _, err = mergeBuilder.Build(builder)
Expand Down Expand Up @@ -539,6 +543,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
dedupedStore,
pop,
NewMetrics(nil),
log.NewNopLogger(),
)
builder, err := NewBlockBuilder(blockOpts, writer)
require.Nil(t, err)
Expand Down
53 changes: 48 additions & 5 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func NewBloomRecorder(ctx context.Context, id string) *BloomRecorder {
chunksSkipped: atomic.NewInt64(0),
seriesMissed: atomic.NewInt64(0),
chunksMissed: atomic.NewInt64(0),
seriesEmpty: atomic.NewInt64(0),
chunksEmpty: atomic.NewInt64(0),
chunksFiltered: atomic.NewInt64(0),
}
}
Expand All @@ -45,6 +47,8 @@ type BloomRecorder struct {
seriesSkipped, chunksSkipped *atomic.Int64
// not found in bloom
seriesMissed, chunksMissed *atomic.Int64
// exists in block index but empty offsets
seriesEmpty, chunksEmpty *atomic.Int64
// filtered out
chunksFiltered *atomic.Int64
}
Expand All @@ -56,6 +60,8 @@ func (r *BloomRecorder) Merge(other *BloomRecorder) {
r.chunksSkipped.Add(other.chunksSkipped.Load())
r.seriesMissed.Add(other.seriesMissed.Load())
r.chunksMissed.Add(other.chunksMissed.Load())
r.seriesEmpty.Add(other.seriesEmpty.Load())
r.chunksEmpty.Add(other.chunksEmpty.Load())
r.chunksFiltered.Add(other.chunksFiltered.Load())
}

Expand All @@ -66,13 +72,15 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) {
seriesFound = r.seriesFound.Load()
seriesSkipped = r.seriesSkipped.Load()
seriesMissed = r.seriesMissed.Load()
seriesRequested = seriesFound + seriesSkipped + seriesMissed
seriesEmpty = r.seriesEmpty.Load()
seriesRequested = seriesFound + seriesSkipped + seriesMissed + seriesEmpty

chunksFound = r.chunksFound.Load()
chunksSkipped = r.chunksSkipped.Load()
chunksMissed = r.chunksMissed.Load()
chunksFiltered = r.chunksFiltered.Load()
chunksRequested = chunksFound + chunksSkipped + chunksMissed
chunksEmpty = r.chunksEmpty.Load()
chunksRequested = chunksFound + chunksSkipped + chunksMissed + chunksEmpty
)
level.Debug(logger).Log(
"recorder_msg", "bloom search results",
Expand All @@ -82,37 +90,41 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) {
"recorder_series_found", seriesFound,
"recorder_series_skipped", seriesSkipped,
"recorder_series_missed", seriesMissed,
"recorder_series_empty", seriesEmpty,

"recorder_chunks_requested", chunksRequested,
"recorder_chunks_found", chunksFound,
"recorder_chunks_skipped", chunksSkipped,
"recorder_chunks_missed", chunksMissed,
"recorder_chunks_empty", chunksEmpty,
"recorder_chunks_filtered", chunksFiltered,
)

if metrics != nil {
metrics.recorderSeries.WithLabelValues(recorderRequested).Add(float64(seriesRequested))
metrics.recorderSeries.WithLabelValues(recorderFound).Add(float64(seriesFound))
metrics.recorderSeries.WithLabelValues(recorderSkipped).Add(float64(seriesSkipped))
metrics.recorderSeries.WithLabelValues(recorderEmpty).Add(float64(seriesEmpty))
metrics.recorderSeries.WithLabelValues(recorderMissed).Add(float64(seriesMissed))

metrics.recorderChunks.WithLabelValues(recorderRequested).Add(float64(chunksRequested))
metrics.recorderChunks.WithLabelValues(recorderFound).Add(float64(chunksFound))
metrics.recorderChunks.WithLabelValues(recorderSkipped).Add(float64(chunksSkipped))
metrics.recorderChunks.WithLabelValues(recorderMissed).Add(float64(chunksMissed))
metrics.recorderChunks.WithLabelValues(recorderEmpty).Add(float64(chunksEmpty))
metrics.recorderChunks.WithLabelValues(recorderFiltered).Add(float64(chunksFiltered))
}
}

func (r *BloomRecorder) record(
seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, chunksFiltered int,
) {
func (r *BloomRecorder) record(seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, seriesEmpty, chunksEmpty, chunksFiltered int) {
r.seriesFound.Add(int64(seriesFound))
r.chunksFound.Add(int64(chunksFound))
r.seriesSkipped.Add(int64(seriesSkipped))
r.chunksSkipped.Add(int64(chunksSkipped))
r.seriesMissed.Add(int64(seriesMissed))
r.chunksMissed.Add(int64(chunksMissed))
r.seriesEmpty.Add(int64(seriesEmpty))
r.chunksEmpty.Add(int64(chunksEmpty))
r.chunksFiltered.Add(int64(chunksFiltered))
}

Expand Down Expand Up @@ -170,6 +182,7 @@ func (fq *FusedQuerier) recordMissingFp(
0, 0, // found
0, 0, // skipped
1, len(input.Chks), // missed
0, 0, // empty
0, // chunks filtered
)
})
Expand All @@ -184,6 +197,22 @@ func (fq *FusedQuerier) recordSkippedFp(
0, 0, // found
1, len(input.Chks), // skipped
0, 0, // missed
0, 0, // empty
0, // chunks filtered
)
})
}

func (fq *FusedQuerier) recordEmptyFp(
batch []Request,
fp model.Fingerprint,
) {
fq.noRemovals(batch, fp, func(input Request) {
input.Recorder.record(
0, 0, // found
0, 0, // skipped
0, 0, // missed
1, len(input.Chks), // empty
0, // chunks filtered
)
})
Expand Down Expand Up @@ -280,6 +309,19 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
})
}

if len(series.Offsets) == 0 {
// We end up here for series with no structured metadata fields.
// While building blooms, these series would yield empty blooms.
// We add these series to the index of the block so we don't report them as missing,
// but we don't filter any chunks for them.
level.Debug(fq.logger).Log(
"msg", "series with empty offsets",
"fp", series.Fingerprint,
)
fq.recordEmptyFp(reqs, series.Fingerprint)
return
}

for i, offset := range series.Offsets {
skip := fq.bq.blooms.LoadOffset(offset)
if skip {
Expand Down Expand Up @@ -361,6 +403,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
1, len(inputs[i].InBlooms), // found
0, 0, // skipped
0, len(inputs[i].Missing), // missed
0, 0, // empty
len(removals), // filtered
)
req.Response <- Output{
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
fromFp, _ := xs[0].Bounds.Bounds()
_, throughFP := xs[len(xs)-1].Bounds.Bounds()
res := SeriesHeader{
Bounds: NewBounds(fromFp, throughFP),
NumSeries: len(xs),
Bounds: NewBounds(fromFp, throughFP),
}

for i, x := range xs {
Expand Down
Loading

0 comments on commit 51c42e8

Please sign in to comment.