diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 760c3dfac27f4..6fe00a4cd1730 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -2,15 +2,8 @@ package v1 import ( "fmt" - "unicode/utf8" "unsafe" - "github.com/grafana/regexp" - - iter "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/logql/log/pattern" - "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" ) @@ -39,228 +32,20 @@ func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefi return true } -// ExtractTestableLineFilters extracts all line filters from an expression -// that can be tested against a bloom filter. This will skip any line filters -// after a line format expression. A line format expression might add content -// that the query later matches against, which can't be tested with a bloom filter. -// E.g. For {app="fake"} |= "foo" | line_format "thisNewTextShouldMatch" |= "thisNewTextShouldMatch" -// this function will return only the line filter for "foo" since the line filter for "thisNewTextShouldMatch" -// wouldn't match against the bloom filter but should match against the query. -func ExtractTestableLineFilters(expr syntax.Expr) []syntax.LineFilterExpr { - if expr == nil { - return nil - } - - var filters []syntax.LineFilterExpr - var lineFmtFound bool - visitor := &syntax.DepthFirstTraversal{ - VisitLineFilterFn: func(_ syntax.RootVisitor, e *syntax.LineFilterExpr) { - if e != nil && !lineFmtFound { - filters = append(filters, *e) - } - }, - VisitLineFmtFn: func(_ syntax.RootVisitor, e *syntax.LineFmtExpr) { - if e != nil { - lineFmtFound = true - } - }, - } - expr.Accept(visitor) - return filters -} - -// FiltersToBloomTest converts a list of line filters to a BloomTest. -// Note that all the line filters should be testable against a bloom filter. -// Use ExtractTestableLineFilters to extract testable line filters from an expression. -// TODO(owen-d): limits the number of bloom lookups run. -// An arbitrarily high number can overconsume cpu and is a DoS vector. -// TODO(owen-d): use for loop not recursion to protect callstack -func FiltersToBloomTest(b NGramBuilder, filters ...syntax.LineFilterExpr) BloomTest { - tests := make(BloomTests, 0, len(filters)) - for _, f := range filters { - if f.Left != nil { - tests = append(tests, FiltersToBloomTest(b, *f.Left)) - } - if f.Or != nil { - left := FiltersToBloomTest(b, *f.Or) - right := simpleFilterToBloomTest(b, f.LineFilter) - tests = append(tests, newOrTest(left, right)) - continue - } - - tests = append(tests, simpleFilterToBloomTest(b, f.LineFilter)) - } - return tests -} - -func simpleFilterToBloomTest(b NGramBuilder, filter syntax.LineFilter) BloomTest { - switch filter.Ty { - case log.LineMatchNotEqual, log.LineMatchNotRegexp, log.LineMatchNotPattern: - // We cannot test _negated_ filters with a bloom filter since blooms are probabilistic - // filters that can only tell us if a string _might_ exist. - // For example, for `!= "foo"`, the bloom filter might tell us that the string "foo" might exist - // but because we are not sure, we cannot discard that chunk because it might actually not be there. - // Therefore, we return a test that always returns true. - return MatchAll - case log.LineMatchEqual: - return newStringTest(b, filter.Match) - case log.LineMatchRegexp: - return MatchAll - case log.LineMatchPattern: - return newPatternTest(b, filter.Match) - default: - return MatchAll - } -} - -type bloomCheckerWrapper struct { - bloom filter.Checker -} - -// Test implements the log.Checker interface -func (b bloomCheckerWrapper) Test(line []byte, _ bool, _ bool) bool { - return b.bloom.Test(line) -} - -// TestRegex implements the log.Checker interface -func (b bloomCheckerWrapper) TestRegex(_ *regexp.Regexp) bool { - // We won't support regexes in bloom filters so we just return true - return true -} - -type logCheckerWrapper struct { - checker log.Checker -} - -// Test implements the filter.Checker interface -func (l logCheckerWrapper) Test(data []byte) bool { - return l.checker.Test(data, true, false) -} - -type matcherFilterWrapper struct { - filter log.Matcher -} - -func (m matcherFilterWrapper) Matches(bloom filter.Checker) bool { - return m.filter.Matches(bloomCheckerWrapper{bloom}) -} - -func (m matcherFilterWrapper) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return m.filter.Matches(bloomCheckerWrapper{prefixedChecker{ - checker: bloom, - buf: buf, - prefixLen: prefixLen, - }}) -} - -type prefixedChecker struct { - checker filter.Checker - buf []byte - prefixLen int -} - -func (p prefixedChecker) Test(data []byte) bool { - return p.checker.Test(append(p.buf[:p.prefixLen], data...)) -} - type matchAllTest struct{} var MatchAll = matchAllTest{} +// Matches implements BloomTest func (n matchAllTest) Matches(_ filter.Checker) bool { return true } +// MatchesWithPrefixBuf implements BloomTest func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool { return true } -// NGramBuilder is an interface for tokenizing strings into ngrams -// Extracting this interface allows us to test the bloom filter without having to use the actual tokenizer -// TODO: This should be moved to tokenizer.go -type NGramBuilder interface { - Tokens(line string) iter.Iterator[[]byte] - N() int - SkipFactor() int -} - -type stringTest struct { - ngrams [][]byte -} - -func newStringTest(b NGramBuilder, search string) (res BloomTest) { - // search string must be longer than the combined ngram length and skip factor - // in order for all possible skip offsets to have at least 1 ngram - skip := b.SkipFactor() - if ct := utf8.RuneCountInString(search); ct < b.N()+skip { - return MatchAll - } - - tests := make([]stringTest, 0, skip) - - for i := 0; i < skip+1; i++ { - searchWithOffset := search - for j := 0; j < i; j++ { - _, size := utf8.DecodeRuneInString(searchWithOffset) - // NB(owen-d): small bounds check for invalid utf8 - searchWithOffset = searchWithOffset[min(size, len(searchWithOffset)):] - } - - var test stringTest - it := b.Tokens(searchWithOffset) - for it.Next() { - ngram := make([]byte, len(it.At())) - copy(ngram, it.At()) - test.ngrams = append(test.ngrams, ngram) - } - tests = append(tests, test) - } - - res = tests[0] - for _, t := range tests[1:] { - res = newOrTest(res, t) - } - return res -} - -// Matches implements the BloomTest interface -func (b stringTest) Matches(bloom filter.Checker) bool { - for _, ngram := range b.ngrams { - if !bloom.Test(ngram) { - return false - } - } - return true -} - -// MatchesWithPrefixBuf implements the BloomTest interface -func (b stringTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - for _, ngram := range b.ngrams { - buf = append(buf[:prefixLen], ngram...) - if !bloom.Test(buf) { - return false - } - } - return true -} - -type stringMatcherFilter struct { - test BloomTest -} - -// Matches implements the log.Filterer interface -func (b stringMatcherFilter) Matches(test log.Checker) bool { - return b.test.Matches(logCheckerWrapper{test}) -} - -func newStringFilterFunc(b NGramBuilder) log.NewMatcherFiltererFunc { - return func(match []byte, _ bool) log.MatcherFilterer { - return log.WrapMatcher(stringMatcherFilter{ - test: newStringTest(b, string(match)), - }) - } -} - type orTest struct { left, right BloomTest } @@ -286,10 +71,12 @@ func newOrTest(left, right BloomTest) orTest { } } +// Matches implements BloomTest func (o orTest) Matches(bloom filter.Checker) bool { return o.left.Matches(bloom) || o.right.Matches(bloom) } +// MatchesWithPrefixBuf implements BloomTest func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) } @@ -305,28 +92,16 @@ func newAndTest(left, right BloomTest) andTest { } } +// Matches implements BloomTest func (a andTest) Matches(bloom filter.Checker) bool { return a.left.Matches(bloom) && a.right.Matches(bloom) } +// MatchesWithPrefixBuf implements BloomTest func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) } -func newPatternTest(b NGramBuilder, match string) BloomTest { - lit, err := pattern.ParseLiterals(match) - if err != nil { - return MatchAll - } - - var res BloomTests - - for _, l := range lit { - res = append(res, newStringTest(b, string(l))) - } - return res -} - func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest { tests := make(BloomTests, 0, len(matchers)) for _, matcher := range matchers { diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 8c400947ad702..7a314872cc86e 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -10,116 +10,6 @@ import ( "github.com/grafana/loki/pkg/push" ) -type fakeLineBloom []string - -// fakeBloom is a fake bloom filter that matches tokens exactly. -// It uses a tokenizer to build the tokens for a line -func newFakeBloom(tokenizer *NGramTokenizer, line string) (res fakeLineBloom) { - toks := tokenizer.Tokens(line) - for toks.Next() { - res = append(res, string(toks.At())) - } - return -} - -func (f fakeLineBloom) Test(data []byte) bool { - str := string(data) - for _, match := range f { - if str == match { - return true - } - } - return false -} - -func TestBloomQueryingLogic(t *testing.T) { - // All tested on 4skip1 - n := 4 - skip := 1 - tokenizer := NewNGramTokenizer(n, skip) - - for _, tc := range []struct { - desc string - line string - query string - match bool - enabled bool - }{ - { - desc: "filter too short always match", - line: "foobar", - query: `{app="fake"} |= "z"`, - match: true, - }, - { - desc: "simple matcher", - line: "foobar", - query: `{app="fake"} |= "oobar"`, - match: true, - }, - { - desc: "longer sequence", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |= "nopqrstuvwxyz"`, - match: true, - }, - { - desc: "longer sequence nomatch", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |= "nopqrstuvwxyzzz"`, - match: false, - }, - { - desc: "pattern simple", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>lmnopq<_>"`, - match: true, - }, - { - desc: "pattern too short matches", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>zzz<_>"`, - match: true, - }, - { - desc: "pattern mix long success and short", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>lmnop<_>zzz<_>"`, - match: true, - }, - { - desc: "pattern mix long fail and short", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>zzzzz<_>zzz<_>"`, - match: false, - }, - { - desc: "regexp disabled", - line: "foobarbaz", - query: `{app="fake"} |~ "(aaaaa|bbbbb)bazz"`, - match: true, - }, - } { - - // shortcut to enable specific tests - tc.enabled = true - if !tc.enabled { - continue - } - t.Run(tc.desc, func(t *testing.T) { - bloom := newFakeBloom(tokenizer, tc.line) - expr, err := syntax.ParseExpr(tc.query) - require.NoError(t, err) - filters := ExtractTestableLineFilters(expr) - bloomTests := FiltersToBloomTest(tokenizer, filters...) - matched := bloomTests.Matches(bloom) - - require.Equal(t, tc.match, matched) - - }) - } -} - func TestLabelMatchersToBloomTest(t *testing.T) { // All test cases below have access to a fake bloom filter with // trace_id=exists_1 and trace_id=exists_2 @@ -219,7 +109,7 @@ type fakeMetadataBloom []string // fakeBloom is a fake bloom filter that matches tokens exactly. // It uses a tokenizer to build the tokens for a line -func newFakeMetadataBloom(tokenizer *StructuredMetadataTokenizer, kvs ...push.LabelAdapter) (res fakeLineBloom) { +func newFakeMetadataBloom(tokenizer *StructuredMetadataTokenizer, kvs ...push.LabelAdapter) (res fakeMetadataBloom) { for _, kv := range kvs { it := tokenizer.Tokens(kv) for it.Next() { diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index b246333021635..2b28256629d48 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math" "sync" "testing" @@ -25,26 +24,26 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{ {Size: 16, Capacity: 512 << 10}, }, nil) -// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't -// have to refactor tests here in order to fix this elsewhere, but it can/should be fixed -- -// the skip & n len are hardcoded based on data that's passed to it elsewhere. -// TODO(chaudum): Can be removed once matching with structured metadata is implemented. -type fakeNgramBuilder struct{} +type singleKeyTest []byte -func (f fakeNgramBuilder) N() int { return math.MaxInt } // do not tokenize -func (f fakeNgramBuilder) SkipFactor() int { return 0 } +// Matches implements BloomTest. +func (s singleKeyTest) Matches(bloom filter.Checker) bool { + return bloom.Test(s) +} -func (f fakeNgramBuilder) Tokens(key string) v2.Iterator[[]byte] { - return v2.NewSliceIter[[]byte]([][]byte{[]byte(key)}) +// MatchesWithPrefixBuf implements BloomTest. +func (s singleKeyTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { + return bloom.Test(append(buf[:prefixLen], s...)) } +// compiler check +var _ BloomTest = singleKeyTest("") + func keysToBloomTest(keys [][]byte) BloomTest { - var tokenizer fakeNgramBuilder tests := make(BloomTests, 0, len(keys)) for _, key := range keys { - tests = append(tests, newStringTest(tokenizer, string(key))) + tests = append(tests, singleKeyTest(key)) } - return tests } @@ -55,7 +54,7 @@ func TestFusedQuerier(t *testing.T) { writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := NewByteReader(indexBuf, bloomsBuf) numSeries := 1000 - data, keys := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ @@ -91,7 +90,7 @@ func TestFusedQuerier(t *testing.T) { Fp: data[idx].Series.Fingerprint, Chks: data[idx].Series.Chunks, Response: ch, - Search: keysToBloomTest(keys[idx]), + Search: singleKeyTest("trace_id"), }) } inputs = append(inputs, reqs) @@ -132,20 +131,13 @@ func TestFusedQuerier(t *testing.T) { for i, input := range inputs { for j, req := range input { resp := resps[i][j] - require.Equal( - t, - Output{ - Fp: req.Fp, - Removals: nil, - }, - resp, - ) + require.Equal(t, Output{Fp: req.Fp, Removals: nil}, resp) } } } // Successfully query series across multiple pages as well as series that only occupy 1 bloom -func TestFuseMultiPage(t *testing.T) { +func TestFusedQuerier_MultiPage(t *testing.T) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -215,13 +207,11 @@ func TestFuseMultiPage(t *testing.T) { chans[i] = make(chan Output, 1) // buffered once to not block in test } - req := func(ngram []byte, ch chan Output) Request { + req := func(key []byte, ch chan Output) Request { return Request{ - Fp: fp, - Chks: []ChunkRef{chk}, - Search: stringTest{ - ngrams: [][]byte{ngram}, - }, + Fp: fp, + Chks: []ChunkRef{chk}, + Search: singleKeyTest(key), Response: ch, Recorder: NewBloomRecorder(context.Background(), "unknown"), } @@ -357,7 +347,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { } } -func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) { +func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) { // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil)