diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 3d04728937cf0..939c91c214398 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -45,28 +45,6 @@ func NewBloomTokenizer(maxBloomSize int, metrics *Metrics, logger log.Logger) *B } } -// prefixedToken returns a byte slice with sufficient capacity for a chunk-ref prefixed token -// of specific ngram length, along with the length of the prefix. -// It ensures enough capacity for the prefix and the token so additional tokens can be created -// without allocations by appending them to the prefix length -// If the buffer is nil or too small, a new one is created. The buffer is returned for reuse. -func prefixedToken(ngram int, chk ChunkRef, buf []byte) ([]byte, int) { - enc := encoding.EncWith(buf) - enc.Reset() - enc.PutBE64(uint64(chk.From)) - enc.PutBE64(uint64(chk.Through)) - enc.PutBE32(chk.Checksum) - prefixLn := enc.Len() // record the length of the prefix - - // If the buffer is too small, ensure enough capacity for the ngram - if cap(enc.Get()) < prefixLn+ngram*MaxRuneLen { - enc.PutBytes(make([]byte, ngram*MaxRuneLen)) - } - - // return the underlying byte slice and the length of the prefix - return enc.Get(), prefixLn -} - // ChunkRefWithIter is a wrapper around a ChunkRef and an EntryIterator. type ChunkRefWithIter struct { Ref ChunkRef diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index f1514eca9a53c..fe663efc0b85c 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -19,7 +19,6 @@ import ( "github.com/grafana/loki/pkg/push" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" @@ -27,54 +26,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - four = NewNGramTokenizer(4, 0) - metrics = NewMetrics(prometheus.DefaultRegisterer) -) - -func TestPrefixedKeyCreation(t *testing.T) { - t.Parallel() - var ones uint64 = 0xffffffffffffffff - - ref := ChunkRef{ - From: 0, - Through: model.Time(int64(ones)), - Checksum: 0xffffffff, - } - for _, tc := range []struct { - desc string - ngram, expLen int - }{ - { - desc: "0-gram", - ngram: 0, - expLen: 20, - }, - { - desc: "4-gram", - ngram: 4, - expLen: 20 + 4*MaxRuneLen, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - token, prefixLn := prefixedToken(tc.ngram, ref, nil) - require.Equal(t, 20, prefixLn) - require.Equal(t, tc.expLen, len(token)) - // first 8 bytes should be zeros from `from` - for i := 0; i < 8; i++ { - require.Equal(t, byte(0), token[i]) - } - // next 8 bytes should be ones from `through` - for i := 8; i < 16; i++ { - require.Equal(t, byte(255), token[i]) - } - // next 4 bytes should be ones from `checksum` - for i := 16; i < 20; i++ { - require.Equal(t, byte(255), token[i]) - } - }) - } -} +var metrics = NewMetrics(prometheus.DefaultRegisterer) func TestTokenizerPopulate(t *testing.T) { t.Parallel() @@ -248,7 +200,6 @@ func populateAndConsumeBloom( func BenchmarkPopulateSeriesWithBloom(b *testing.B) { for i := 0; i < b.N; i++ { - var testLine = lorem + lorem + lorem bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) @@ -256,7 +207,11 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), - Line: testLine, + Line: "", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{Name: "trace_id", Value: fmt.Sprintf("%04x", i)}, + push.LabelAdapter{Name: "org_id", Value: fmt.Sprintf("%d", i%1000)}, + }, }) itr, err := memChunk.Iterator( context.Background(), diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index bf11703c2355b..486ebcce0cf34 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -167,20 +167,20 @@ func TestFusedQuerier_MultiPage(t *testing.T) { Chunks: []ChunkRef{chk}, } - buf, prefixLn := prefixedToken(3, chk, nil) + buf := prefixForChunkRef(chk) b1 := &Bloom{ *filter.NewScalableBloomFilter(1024, 0.01, 0.8), } key1, key2 := []byte("foo"), []byte("bar") b1.Add(key1) - b1.Add(append(buf[:prefixLn], key1...)) + b1.Add(append(buf, key1...)) b2 := &Bloom{ *filter.NewScalableBloomFilter(1024, 0.01, 0.8), } b2.Add(key2) - b2.Add(append(buf[:prefixLn], key2...)) + b2.Add(append(buf, key2...)) _, err = builder.BuildFrom(v2.NewSliceIter([]SeriesWithBlooms{ { @@ -257,7 +257,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { numSeries := 4 data := make([]SeriesWithBlooms, 0, numSeries) - tokenizer := NewNGramTokenizer(4, 0) + for i := 0; i < numSeries; i++ { var series Series series.Fingerprint = model.Fingerprint(i) @@ -280,12 +280,8 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { } for j := 0; j < nLines; j++ { - line := fmt.Sprintf("%04x:%04x", i, j) - it := tokenizer.Tokens(line) - for it.Next() { - key := it.At() - bloom.Add(key) - } + key := fmt.Sprintf("%04x:%04x", i, j) + bloom.Add([]byte(key)) } data = append(data, SeriesWithBlooms{ diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index 5cbf199448f68..59654747832d8 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -2,17 +2,12 @@ package v1 import ( "fmt" - "unicode/utf8" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/pkg/push" ) -const ( - MaxRuneLen = 4 -) - type StructuredMetadataTokenizer struct { // prefix to add to tokens, typically the encoded chunkref prefix string @@ -26,7 +21,6 @@ func NewStructuredMetadataTokenizer(prefix string) *StructuredMetadataTokenizer } } -// Tokens implements the NGramBuilder interface func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator[string] { combined := fmt.Sprintf("%s=%s", kv.Name, kv.Value) t.tokens = append(t.tokens[:0], @@ -36,118 +30,3 @@ func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator ) return iter.NewSliceIter(t.tokens) } - -func reassemble(buf []rune, ln, pos int, result []byte) []byte { - result = result[:0] // Reset the result slice - for i := 0; i < ln; i++ { - cur := pos % len(buf) - pos++ - result = utf8.AppendRune(result, buf[cur]) - } - return result -} - -// Iterable variants (more performant, less space) -type NGramTokenizer struct { - n, skip int - buffer []rune // circular buffer used for ngram generation - res []byte // buffer used for token generation -} - -func (t *NGramTokenizer) N() int { - return t.n -} - -func (t *NGramTokenizer) SkipFactor() int { - return t.skip -} - -/* -N-Grams (https://en.wikipedia.org/wiki/N-gram) are a series of 'n' adjacent characters in a string. -These will be utilized for the bloom filters to allow for fuzzy searching. -*/ -func NewNGramTokenizer(n, skip int) *NGramTokenizer { - t := &NGramTokenizer{ - n: n, - skip: skip, - buffer: make([]rune, n+skip), - res: make([]byte, 0, n*MaxRuneLen), // maximum 4 bytes per rune - } - - return t -} - -// Token implements the NGramBuilder interface -// The Token iterator uses shared buffers for performance. The []byte returned by At() -// is not safe for use after subsequent calls to Next() -func (t *NGramTokenizer) Tokens(line string) iter.Iterator[[]byte] { - return &NGramTokenIter{ - n: t.N(), - skip: t.SkipFactor(), - - line: line, - - buffer: t.buffer, - res: t.res, - } -} - -type NGramTokenIter struct { - n, skip int - - runeIndex, offset int - line string // source - - buffer []rune // circular buffers used for ngram generation - res []byte -} - -func (t *NGramTokenIter) Next() bool { - for i, r := range t.line[t.offset:] { - t.buffer[t.runeIndex%len(t.buffer)] = r - t.runeIndex++ - - if t.runeIndex < t.n { - continue - } - - // if the start of the ngram is at the interval of our skip factor, emit it. - // we increment the skip due to modulo logic: - // because `n % 0 is a divide by zero and n % 1 is always 0` - if (t.runeIndex-t.n)%(t.skip+1) == 0 { - // update the offset, but don't go past the end of the line; - // for instance invalid utf-8 - t.offset = min(len(t.line), t.offset+i+utf8.RuneLen(r)) - return true - } - - } - return false -} - -func (t *NGramTokenIter) At() []byte { - return reassemble(t.buffer, t.n, (t.runeIndex-t.n)%len(t.buffer), t.res[:0]) -} - -func (t *NGramTokenIter) Err() error { - return nil -} - -type PrefixedTokenIter struct { - buf []byte - prefixLen int - - iter.Iterator[[]byte] -} - -func (t *PrefixedTokenIter) At() []byte { - return append(t.buf[:t.prefixLen], t.Iterator.At()...) -} - -func NewPrefixedTokenIter(buf []byte, prefixLn int, itr iter.Iterator[[]byte]) *PrefixedTokenIter { - return &PrefixedTokenIter{ - buf: buf, - prefixLen: prefixLn, - Iterator: itr, - } -} diff --git a/pkg/storage/bloom/v1/tokenizer_test.go b/pkg/storage/bloom/v1/tokenizer_test.go index f21aceca06402..0aeb0ba1f5510 100644 --- a/pkg/storage/bloom/v1/tokenizer_test.go +++ b/pkg/storage/bloom/v1/tokenizer_test.go @@ -2,7 +2,6 @@ package v1 import ( "testing" - "unicode/utf8" "github.com/stretchr/testify/require" @@ -11,230 +10,6 @@ import ( "github.com/grafana/loki/pkg/push" ) -const BigFile = "../../../logql/sketch/testdata/war_peace.txt" - -func TestNGramIterator(t *testing.T) { - t.Parallel() - var ( - three = NewNGramTokenizer(3, 0) - threeSkip1 = NewNGramTokenizer(3, 1) - threeSkip3 = NewNGramTokenizer(3, 3) - ) - - for _, tc := range []struct { - desc string - t *NGramTokenizer - input string - exp []string - }{ - { - t: three, - input: "", - exp: []string{}, - }, - { - t: three, - input: "ab", - exp: []string{}, - }, - { - t: three, - input: "abcdefg", - exp: []string{"abc", "bcd", "cde", "def", "efg"}, - }, - { - t: threeSkip1, - input: "abcdefg", - exp: []string{"abc", "cde", "efg"}, - }, - { - t: threeSkip3, - input: "abcdefgh", - exp: []string{"abc", "efg"}, - }, - { - t: three, - input: "日本語", - exp: []string{"日本語"}, - }, - { - t: four, - input: "日本語日本語", - exp: []string{ - "日本語日", - "本語日本", - "語日本語"}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - itr := tc.t.Tokens(tc.input) - for _, exp := range tc.exp { - require.True(t, itr.Next()) - require.Equal(t, exp, string(itr.At())) - } - require.False(t, itr.Next()) - }) - } -} - -// Mainly this ensures we don't panic when a string ends in invalid utf8 -func TestInvalidUTF8(t *testing.T) { - x := NewNGramTokenizer(3, 0) - - input := "abc\x80" - require.False(t, utf8.ValidString(input)) - itr := x.Tokens(input) - require.True(t, itr.Next()) - require.Equal(t, []byte("abc"), itr.At()) - require.True(t, itr.Next()) - // we don't really care about the final rune returned and it's probably not worth the perf cost - // to check for it - require.Equal(t, []byte{0x62, 0x63, 0xef, 0xbf, 0xbd}, itr.At()) - require.False(t, itr.Next()) -} - -func TestPrefixedIterator(t *testing.T) { - t.Parallel() - var ( - three = NewNGramTokenizer(3, 0) - ) - - for _, tc := range []struct { - desc string - input string - exp []string - }{ - { - input: "", - exp: []string{}, - }, - { - input: "ab", - exp: []string{}, - }, - { - input: "abcdefg", - exp: []string{"0123abc", "0123bcd", "0123cde", "0123def", "0123efg"}, - }, - - { - input: "日本語", - exp: []string{"0123日本語"}, - }, - } { - prefix := []byte("0123") - t.Run(tc.desc, func(t *testing.T) { - itr := NewPrefixedTokenIter(prefix, len(prefix), three.Tokens(tc.input)) - for _, exp := range tc.exp { - require.True(t, itr.Next()) - require.Equal(t, exp, string(itr.At())) - } - require.False(t, itr.Next()) - }) - } -} - -const lorem = ` -lorum ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore magna -aliqua ut enim ad minim veniam quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat -duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur excepteur -sint occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim id est -laborum ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore magna -aliqua ut enim ad minim veniam quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat -duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur excepteur -sint occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim id est -` - -func BenchmarkTokens(b *testing.B) { - var ( - v2Three = NewNGramTokenizer(3, 0) - v2ThreeSkip1 = NewNGramTokenizer(3, 1) - ) - - type impl struct { - desc string - f func() - } - type tc struct { - desc string - impls []impl - } - for _, tc := range []tc{ - { - desc: "three", - impls: []impl{ - { - desc: "v2", - f: func() { - itr := v2Three.Tokens(lorem) - for itr.Next() { - _ = itr.At() - } - }, - }, - }, - }, - { - desc: "threeSkip1", - impls: []impl{ - { - desc: "v2", - f: func() { - itr := v2ThreeSkip1.Tokens(lorem) - for itr.Next() { - _ = itr.At() - } - }, - }, - }, - }, - { - desc: "threeChunk", - impls: []impl{ - { - desc: "v2", - f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N(), ChunkRef{}, nil) - return func() { - itr := NewPrefixedTokenIter(buf, prefixLn, v2Three.Tokens(lorem)) - for itr.Next() { - _ = itr.At() - } - } - }(), - }, - }, - }, - { - desc: "threeSkip1Chunk", - impls: []impl{ - { - desc: "v2", - f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N(), ChunkRef{}, nil) - return func() { - itr := NewPrefixedTokenIter(buf, prefixLn, v2ThreeSkip1.Tokens(lorem)) - for itr.Next() { - _ = itr.At() - } - } - }(), - }, - }, - }, - } { - b.Run(tc.desc, func(b *testing.B) { - for _, impl := range tc.impls { - b.Run(impl.desc, func(b *testing.B) { - for i := 0; i < b.N; i++ { - impl.f() - } - }) - } - }) - } -} - func TestStructuredMetadataTokenizer(t *testing.T) { tokenizer := NewStructuredMetadataTokenizer("chunk")