Skip to content

Commit

Permalink
Add series population to tokenizer (grafana#11070)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
I started to use tokenizer module for bloom creation and populating the
SBF without the series with chunk ref is weird, as we populate only half
of a data structure.
I modified it to populate entire `SeriesWithBloom` as it has all the
information.

**Which issue(s) this PR fixes**:
Fixes #<issue number>

**Special notes for your reviewer**:

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](grafana@d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory. <!--
TODO(salvacorts): Add example PR -->

---------

Co-authored-by: Paul Rogers <[email protected]>
  • Loading branch information
2 people authored and rhnasc committed Apr 12, 2024
1 parent 3aed067 commit c2c3b90
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
18 changes: 14 additions & 4 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/bloom/v1/filter"
"github.com/grafana/loki/pkg/storage/chunk"
util_log "github.com/grafana/loki/pkg/util/log"
//"github.com/grafana/loki/tools/tsdb/helpers"
Expand Down Expand Up @@ -69,20 +68,26 @@ func clearCache(cache map[string]interface{}) {
}
}

func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []chunk.Chunk) {
func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *SeriesWithBloom, chunks []chunk.Chunk) {
clearCache(bt.cache)
for idx := range chunks {
lc := chunks[idx].Data.(*chunkenc.Facade).LokiChunk()
bt.chunkIDTokenizer.Reinit(chunks[idx].ChunkRef)

// TODO: error handling
itr, _ := lc.Iterator(
itr, err := lc.Iterator(
context.Background(),
time.Unix(0, 0), // TODO: Parameterize/better handle the timestamps?
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
log.NewNoopPipeline().ForStream(chunks[idx].Metric),
)
if err != nil {
level.Info(util_log.Logger).Log("chunk iterator cannot be created")
return
}

defer itr.Close()

for itr.Next() && itr.Error() == nil {
toks := bt.chunkIDTokenizer.Tokens(itr.Entry().Line)
Expand All @@ -94,7 +99,7 @@ func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []
if !found {
bt.cache[str] = nil

sbf.TestAndAdd(tok.Key)
seriesWithBloom.Bloom.ScalableBloomFilter.TestAndAdd(tok.Key)

if len(bt.cache) > 150000 { // While crude, this has proven efficient in performance testing. This speaks to the similarity in log lines near each other
clearCache(bt.cache)
Expand All @@ -103,6 +108,11 @@ func (bt *BloomTokenizer) PopulateSBF(sbf *filter.ScalableBloomFilter, chunks []
}
}
}
seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, ChunkRef{
Start: chunks[idx].From,
End: chunks[idx].Through,
Checksum: chunks[idx].Checksum,
})
} // for each chunk
}

Expand Down
14 changes: 12 additions & 2 deletions tools/tsdb/bloom-tester/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,17 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
startTime := time.Now().UnixMilli()

sbf := experiment.bloom()
bloomTokenizer.PopulateSBF(sbf, got)
bloom := bt.Bloom{
ScalableBloomFilter: *sbf,
}
series := bt.Series{
Fingerprint: fp,
}
swb := bt.SeriesWithBloom{
Bloom: &bloom,
Series: &series,
}
bloomTokenizer.PopulateSeriesWithBloom(&swb, got)

endTime := time.Now().UnixMilli()
if len(got) > 0 {
Expand All @@ -361,7 +371,7 @@ func analyze(metrics *Metrics, sampler Sampler, indexShipper indexshipper.IndexS
float64(estimatedCount(sbf.Capacity(), sbf.FillRatio())),
)

writeSBF(sbf,
writeSBF(&swb.Bloom.ScalableBloomFilter,
os.Getenv("DIR"),
fmt.Sprint(bucketPrefix, experiment.name),
os.Getenv("BUCKET"),
Expand Down

0 comments on commit c2c3b90

Please sign in to comment.