Skip to content

Commit

Permalink
Remove ngram tokenizer
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum committed Sep 19, 2024
1 parent 05e5d45 commit b53b846
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 429 deletions.
22 changes: 0 additions & 22 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,6 @@ func NewBloomTokenizer(maxBloomSize int, metrics *Metrics, logger log.Logger) *B
}
}

// prefixedToken returns a byte slice with sufficient capacity for a chunk-ref prefixed token
// of specific ngram length, along with the length of the prefix.
// It ensures enough capacity for the prefix and the token so additional tokens can be created
// without allocations by appending them to the prefix length
// If the buffer is nil or too small, a new one is created. The buffer is returned for reuse.
func prefixedToken(ngram int, chk ChunkRef, buf []byte) ([]byte, int) {
enc := encoding.EncWith(buf)
enc.Reset()
enc.PutBE64(uint64(chk.From))
enc.PutBE64(uint64(chk.Through))
enc.PutBE32(chk.Checksum)
prefixLn := enc.Len() // record the length of the prefix

// If the buffer is too small, ensure enough capacity for the ngram
if cap(enc.Get()) < prefixLn+ngram*MaxRuneLen {
enc.PutBytes(make([]byte, ngram*MaxRuneLen))
}

// return the underlying byte slice and the length of the prefix
return enc.Get(), prefixLn
}

// ChunkRefWithIter is a wrapper around a ChunkRef and an EntryIterator.
type ChunkRefWithIter struct {
Ref ChunkRef
Expand Down
57 changes: 6 additions & 51 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,14 @@ import (

"github.com/grafana/loki/pkg/push"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"

"github.com/prometheus/client_golang/prometheus"
)

var (
four = NewNGramTokenizer(4, 0)
metrics = NewMetrics(prometheus.DefaultRegisterer)
)

func TestPrefixedKeyCreation(t *testing.T) {
t.Parallel()
var ones uint64 = 0xffffffffffffffff

ref := ChunkRef{
From: 0,
Through: model.Time(int64(ones)),
Checksum: 0xffffffff,
}
for _, tc := range []struct {
desc string
ngram, expLen int
}{
{
desc: "0-gram",
ngram: 0,
expLen: 20,
},
{
desc: "4-gram",
ngram: 4,
expLen: 20 + 4*MaxRuneLen,
},
} {
t.Run(tc.desc, func(t *testing.T) {
token, prefixLn := prefixedToken(tc.ngram, ref, nil)
require.Equal(t, 20, prefixLn)
require.Equal(t, tc.expLen, len(token))
// first 8 bytes should be zeros from `from`
for i := 0; i < 8; i++ {
require.Equal(t, byte(0), token[i])
}
// next 8 bytes should be ones from `through`
for i := 8; i < 16; i++ {
require.Equal(t, byte(255), token[i])
}
// next 4 bytes should be ones from `checksum`
for i := 16; i < 20; i++ {
require.Equal(t, byte(255), token[i])
}
})
}
}
var metrics = NewMetrics(prometheus.DefaultRegisterer)

func TestTokenizerPopulate(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -249,15 +201,18 @@ func populateAndConsumeBloom(

func BenchmarkPopulateSeriesWithBloom(b *testing.B) {
for i := 0; i < b.N; i++ {
var testLine = lorem + lorem + lorem
bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger())

sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8)

memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000)
_, _ = memChunk.Append(&push.Entry{
Timestamp: time.Unix(0, 1),
Line: testLine,
Line: "",
StructuredMetadata: push.LabelsAdapter{
push.LabelAdapter{Name: "trace_id", Value: fmt.Sprintf("%04x", i)},
push.LabelAdapter{Name: "org_id", Value: fmt.Sprintf("%d", i%1000)},
},
})
itr, err := memChunk.Iterator(
context.Background(),
Expand Down
16 changes: 6 additions & 10 deletions pkg/storage/bloom/v1/fuse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,20 +167,20 @@ func TestFusedQuerier_MultiPage(t *testing.T) {
Chunks: []ChunkRef{chk},
}

buf, prefixLn := prefixedToken(3, chk, nil)
buf := prefixForChunkRef(chk)

b1 := &Bloom{
*filter.NewScalableBloomFilter(1024, 0.01, 0.8),
}
key1, key2 := []byte("foo"), []byte("bar")
b1.Add(key1)
b1.Add(append(buf[:prefixLn], key1...))
b1.Add(append(buf, key1...))

b2 := &Bloom{
*filter.NewScalableBloomFilter(1024, 0.01, 0.8),
}
b2.Add(key2)
b2.Add(append(buf[:prefixLn], key2...))
b2.Add(append(buf, key2...))

_, err = builder.BuildFrom(v2.NewSliceIter([]SeriesWithBlooms{
{
Expand Down Expand Up @@ -257,7 +257,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {

numSeries := 4
data := make([]SeriesWithBlooms, 0, numSeries)
tokenizer := NewNGramTokenizer(4, 0)

for i := 0; i < numSeries; i++ {
var series Series
series.Fingerprint = model.Fingerprint(i)
Expand All @@ -280,12 +280,8 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) {
}

for j := 0; j < nLines; j++ {
line := fmt.Sprintf("%04x:%04x", i, j)
it := tokenizer.Tokens(line)
for it.Next() {
key := it.At()
bloom.Add(key)
}
key := fmt.Sprintf("%04x:%04x", i, j)
bloom.Add([]byte(key))
}

data = append(data, SeriesWithBlooms{
Expand Down
121 changes: 0 additions & 121 deletions pkg/storage/bloom/v1/tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,12 @@ package v1

import (
"fmt"
"unicode/utf8"

iter "github.com/grafana/loki/v3/pkg/iter/v2"

"github.com/grafana/loki/pkg/push"
)

const (
MaxRuneLen = 4
)

type StructuredMetadataTokenizer struct {
// prefix to add to tokens, typically the encoded chunkref
prefix string
Expand All @@ -26,7 +21,6 @@ func NewStructuredMetadataTokenizer(prefix string) *StructuredMetadataTokenizer
}
}

// Tokens implements the NGramBuilder interface
func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator[string] {
combined := fmt.Sprintf("%s=%s", kv.Name, kv.Value)
t.tokens = append(t.tokens[:0],
Expand All @@ -36,118 +30,3 @@ func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator
)
return iter.NewSliceIter(t.tokens)
}

func reassemble(buf []rune, ln, pos int, result []byte) []byte {
result = result[:0] // Reset the result slice
for i := 0; i < ln; i++ {
cur := pos % len(buf)
pos++
result = utf8.AppendRune(result, buf[cur])
}
return result
}

// Iterable variants (more performant, less space)
type NGramTokenizer struct {
n, skip int
buffer []rune // circular buffer used for ngram generation
res []byte // buffer used for token generation
}

func (t *NGramTokenizer) N() int {
return t.n
}

func (t *NGramTokenizer) SkipFactor() int {
return t.skip
}

/*
N-Grams (https://en.wikipedia.org/wiki/N-gram) are a series of 'n' adjacent characters in a string.
These will be utilized for the bloom filters to allow for fuzzy searching.
*/
func NewNGramTokenizer(n, skip int) *NGramTokenizer {
t := &NGramTokenizer{
n: n,
skip: skip,
buffer: make([]rune, n+skip),
res: make([]byte, 0, n*MaxRuneLen), // maximum 4 bytes per rune
}

return t
}

// Token implements the NGramBuilder interface
// The Token iterator uses shared buffers for performance. The []byte returned by At()
// is not safe for use after subsequent calls to Next()
func (t *NGramTokenizer) Tokens(line string) iter.Iterator[[]byte] {
return &NGramTokenIter{
n: t.N(),
skip: t.SkipFactor(),

line: line,

buffer: t.buffer,
res: t.res,
}
}

type NGramTokenIter struct {
n, skip int

runeIndex, offset int
line string // source

buffer []rune // circular buffers used for ngram generation
res []byte
}

func (t *NGramTokenIter) Next() bool {
for i, r := range t.line[t.offset:] {
t.buffer[t.runeIndex%len(t.buffer)] = r
t.runeIndex++

if t.runeIndex < t.n {
continue
}

// if the start of the ngram is at the interval of our skip factor, emit it.
// we increment the skip due to modulo logic:
// because `n % 0 is a divide by zero and n % 1 is always 0`
if (t.runeIndex-t.n)%(t.skip+1) == 0 {
// update the offset, but don't go past the end of the line;
// for instance invalid utf-8
t.offset = min(len(t.line), t.offset+i+utf8.RuneLen(r))
return true
}

}
return false
}

func (t *NGramTokenIter) At() []byte {
return reassemble(t.buffer, t.n, (t.runeIndex-t.n)%len(t.buffer), t.res[:0])
}

func (t *NGramTokenIter) Err() error {
return nil
}

type PrefixedTokenIter struct {
buf []byte
prefixLen int

iter.Iterator[[]byte]
}

func (t *PrefixedTokenIter) At() []byte {
return append(t.buf[:t.prefixLen], t.Iterator.At()...)
}

func NewPrefixedTokenIter(buf []byte, prefixLn int, itr iter.Iterator[[]byte]) *PrefixedTokenIter {
return &PrefixedTokenIter{
buf: buf,
prefixLen: prefixLn,
Iterator: itr,
}
}
Loading

0 comments on commit b53b846

Please sign in to comment.