From c2c3b905e09580cc83615c83b634c5824cb255f8 Mon Sep 17 00:00:00 2001 From: Poyzan <31743851+poyzannur@users.noreply.github.com> Date: Wed, 1 Nov 2023 15:47:36 +0000 Subject: [PATCH] Add series population to tokenizer (#11070) **What this PR does / why we need it**: I started to use tokenizer module for bloom creation and populating the SBF without the series with chunk ref is weird, as we populate only half of a data structure. I modified it to populate entire `SeriesWithBloom` as it has all the information. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. --------- Co-authored-by: Paul Rogers --- pkg/storage/bloom/v1/bloom_tokenizer.go | 18 ++++++++++++++---- tools/tsdb/bloom-tester/lib.go | 14 ++++++++++++-- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 7060052438190..e2659180b4eac 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -11,7 +11,6 @@ import ( "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" - "github.com/grafana/loki/pkg/storage/bloom/v1/filter" "github.com/grafana/loki/pkg/storage/chunk" util_log "github.com/grafana/loki/pkg/util/log" //"github.com/grafana/loki/tools/tsdb/helpers" @@ -69,20 +68,26 @@ func clearCache(cache map[string]interface{}) { } } -func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []chunk.Chunk) { +func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) { clearCache(bt.cache) for idx := range chunks { lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk() bt.chunkIDTokenizer.Reinit(chunks[idx].ChunkRef) // TODO: error handling - itr, _ := lc.Iterator( + itr, err := lc.Iterator( context.Background(), time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps? time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(chunks[idx].Metric), ) + if err != nil { + level.Info(util_log.Logger).Log("chunk iterator cannot be created") + return + } + + defer itr.Close() for itr.Next() && itr.Error() == nil { toks := bt.chunkIDTokenizer.Tokens(itr.Entry().Line) @@ -94,7 +99,7 @@ func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks [] if !found { bt.cache[str] = nil - sbf.TestAndAdd(tok.Key) + seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key) if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other clearCache(bt.cache) @@ -103,6 +108,11 @@ func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks [] } } } + seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, ChunkRef{ + Start: chunks[idx].From, + End: chunks[idx].Through, + Checksum: chunks[idx].Checksum, + }) } // for each chunk } diff --git a/tools/tsdb/bloom-tester/lib.go b/tools/tsdb/bloom-tester/lib.go index 2a40f162a8995..18ceb14b6b611 100644 --- a/tools/tsdb/bloom-tester/lib.go +++ b/tools/tsdb/bloom-tester/lib.go @@ -350,7 +350,17 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS startTime := time.Now().UnixMilli() sbf := experiment.bloom() - bloomTokenizer.PopulateSBF(sbf, got) + bloom := bt.Bloom{ + ScalableBloomFilter: *sbf, + } + series := bt.Series{ + Fingerprint: fp, + } + swb := bt.SeriesWithBloom{ + Bloom: &bloom, + Series: &series, + } + bloomTokenizer.PopulateSeriesWithBloom(&swb, got) endTime := time.Now().UnixMilli() if len(got) > 0 { @@ -361,7 +371,7 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS float64(estimatedCount(sbf.Capacity(), sbf.FillRatio())), ) - writeSBF(sbf, + writeSBF(&swb.Bloom.ScalableBloomFilter, os.Getenv("DIR"), fmt.Sprint(bucketPrefix, experiment.name), os.Getenv("BUCKET"),