Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Clean up unused bloom filter related code #14183

Merged
merged 6 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3713,20 +3713,6 @@ shard_streams:
# CLI flag: -bloom-build.split-keyspace-by
[bloom_split_series_keyspace_by: <int> | default = 256]

# Experimental. Length of the n-grams created when computing blooms from log
# lines.
# CLI flag: -bloom-build.ngram-length
[bloom_ngram_length: <int> | default = 4]

# Experimental. Skip factor for the n-grams created when computing blooms from
# log lines.
# CLI flag: -bloom-build.ngram-skip
[bloom_ngram_skip: <int> | default = 1]

# Experimental. Scalable Bloom Filter desired false-positive rate.
# CLI flag: -bloom-build.false-positive-rate
[bloom_false_positive_rate: <float> | default = 0.01]

# Experimental. Compression algorithm for bloom block pages.
# CLI flag: -bloom-build.block-encoding
[bloom_block_encoding: <string> | default = "none"]
Expand Down
4 changes: 1 addition & 3 deletions pkg/bloombuild/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,11 +340,9 @@ func (b *Builder) processTask(

var (
blockCt int
nGramSize = uint64(b.limits.BloomNGramLength(tenant))
nGramSkip = uint64(b.limits.BloomNGramSkip(tenant))
maxBlockSize = uint64(b.limits.BloomMaxBlockSize(tenant))
maxBloomSize = uint64(b.limits.BloomMaxBloomSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize, maxBloomSize)
blockOpts = v1.NewBlockOptions(blockEnc, maxBlockSize, maxBloomSize)
created []bloomshipper.Meta
totalSeries int
bytesAdded int
Expand Down
2 changes: 0 additions & 2 deletions pkg/bloombuild/builder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ func (cfg *Config) Validate() error {

type Limits interface {
BloomBlockEncoding(tenantID string) string
BloomNGramLength(tenantID string) int
BloomNGramSkip(tenantID string) int
BloomMaxBlockSize(tenantID string) int
BloomMaxBloomSize(tenantID string) int
}
2 changes: 0 additions & 2 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ func NewSimpleBloomGenerator(
reporter: reporter,

tokenizer: v1.NewBloomTokenizer(
opts.Schema.NGramLen(),
opts.Schema.NGramSkip(),
int(opts.UnencodedBlockOptions.MaxBloomSizeBytes),
metrics,
log.With(
Expand Down
8 changes: 4 additions & 4 deletions pkg/bloombuild/builder/spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func TestSimpleBloomGenerator(t *testing.T) {
}{
{
desc: "SkipsIncompatibleSchemas",
fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
fromSchema: v1.NewBlockOptions(enc, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, maxBlockSize, 0),
},
{
desc: "CombinesBlocks",
fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0),
fromSchema: v1.NewBlockOptions(enc, maxBlockSize, 0),
toSchema: v1.NewBlockOptions(enc, maxBlockSize, 0),
},
} {
t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/bloombuild/planner/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) {
writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf)
reader := v1.NewByteReader(indexBuf, bloomsBuf)

blockOpts := v1.NewBlockOptions(compression.EncNone, 4, 1, 0, 0)
blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0)

builder, err := v1.NewBlockBuilder(blockOpts, writer)
if err != nil {
Expand Down
29 changes: 0 additions & 29 deletions pkg/storage/bloom/spec.go

This file was deleted.

237 changes: 6 additions & 231 deletions pkg/storage/bloom/v1/bloom_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,8 @@ package v1

import (
"fmt"
"unicode/utf8"
"unsafe"

"github.com/grafana/regexp"

iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logql/log/pattern"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter"
)

Expand Down Expand Up @@ -39,228 +32,20 @@ func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefi
return true
}

// ExtractTestableLineFilters extracts all line filters from an expression
// that can be tested against a bloom filter. This will skip any line filters
// after a line format expression. A line format expression might add content
// that the query later matches against, which can't be tested with a bloom filter.
// E.g. For {app="fake"} |= "foo" | line_format "thisNewTextShouldMatch" |= "thisNewTextShouldMatch"
// this function will return only the line filter for "foo" since the line filter for "thisNewTextShouldMatch"
// wouldn't match against the bloom filter but should match against the query.
func ExtractTestableLineFilters(expr syntax.Expr) []syntax.LineFilterExpr {
if expr == nil {
return nil
}

var filters []syntax.LineFilterExpr
var lineFmtFound bool
visitor := &syntax.DepthFirstTraversal{
VisitLineFilterFn: func(_ syntax.RootVisitor, e *syntax.LineFilterExpr) {
if e != nil && !lineFmtFound {
filters = append(filters, *e)
}
},
VisitLineFmtFn: func(_ syntax.RootVisitor, e *syntax.LineFmtExpr) {
if e != nil {
lineFmtFound = true
}
},
}
expr.Accept(visitor)
return filters
}

// FiltersToBloomTest converts a list of line filters to a BloomTest.
// Note that all the line filters should be testable against a bloom filter.
// Use ExtractTestableLineFilters to extract testable line filters from an expression.
// TODO(owen-d): limits the number of bloom lookups run.
// An arbitrarily high number can overconsume cpu and is a DoS vector.
// TODO(owen-d): use for loop not recursion to protect callstack
func FiltersToBloomTest(b NGramBuilder, filters ...syntax.LineFilterExpr) BloomTest {
tests := make(BloomTests, 0, len(filters))
for _, f := range filters {
if f.Left != nil {
tests = append(tests, FiltersToBloomTest(b, *f.Left))
}
if f.Or != nil {
left := FiltersToBloomTest(b, *f.Or)
right := simpleFilterToBloomTest(b, f.LineFilter)
tests = append(tests, newOrTest(left, right))
continue
}

tests = append(tests, simpleFilterToBloomTest(b, f.LineFilter))
}
return tests
}

func simpleFilterToBloomTest(b NGramBuilder, filter syntax.LineFilter) BloomTest {
switch filter.Ty {
case log.LineMatchNotEqual, log.LineMatchNotRegexp, log.LineMatchNotPattern:
// We cannot test _negated_ filters with a bloom filter since blooms are probabilistic
// filters that can only tell us if a string _might_ exist.
// For example, for `!= "foo"`, the bloom filter might tell us that the string "foo" might exist
// but because we are not sure, we cannot discard that chunk because it might actually not be there.
// Therefore, we return a test that always returns true.
return MatchAll
case log.LineMatchEqual:
return newStringTest(b, filter.Match)
case log.LineMatchRegexp:
return MatchAll
case log.LineMatchPattern:
return newPatternTest(b, filter.Match)
default:
return MatchAll
}
}

type bloomCheckerWrapper struct {
bloom filter.Checker
}

// Test implements the log.Checker interface
func (b bloomCheckerWrapper) Test(line []byte, _ bool, _ bool) bool {
return b.bloom.Test(line)
}

// TestRegex implements the log.Checker interface
func (b bloomCheckerWrapper) TestRegex(_ *regexp.Regexp) bool {
// We won't support regexes in bloom filters so we just return true
return true
}

type logCheckerWrapper struct {
checker log.Checker
}

// Test implements the filter.Checker interface
func (l logCheckerWrapper) Test(data []byte) bool {
return l.checker.Test(data, true, false)
}

type matcherFilterWrapper struct {
filter log.Matcher
}

func (m matcherFilterWrapper) Matches(bloom filter.Checker) bool {
return m.filter.Matches(bloomCheckerWrapper{bloom})
}

func (m matcherFilterWrapper) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return m.filter.Matches(bloomCheckerWrapper{prefixedChecker{
checker: bloom,
buf: buf,
prefixLen: prefixLen,
}})
}

type prefixedChecker struct {
checker filter.Checker
buf []byte
prefixLen int
}

func (p prefixedChecker) Test(data []byte) bool {
return p.checker.Test(append(p.buf[:p.prefixLen], data...))
}

type matchAllTest struct{}

var MatchAll = matchAllTest{}

// Matches implements BloomTest
func (n matchAllTest) Matches(_ filter.Checker) bool {
return true
}

// MatchesWithPrefixBuf implements BloomTest
func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool {
return true
}

// NGramBuilder is an interface for tokenizing strings into ngrams
// Extracting this interface allows us to test the bloom filter without having to use the actual tokenizer
// TODO: This should be moved to tokenizer.go
type NGramBuilder interface {
Tokens(line string) iter.Iterator[[]byte]
N() int
SkipFactor() int
}

type stringTest struct {
ngrams [][]byte
}

func newStringTest(b NGramBuilder, search string) (res BloomTest) {
// search string must be longer than the combined ngram length and skip factor
// in order for all possible skip offsets to have at least 1 ngram
skip := b.SkipFactor()
if ct := utf8.RuneCountInString(search); ct < b.N()+skip {
return MatchAll
}

tests := make([]stringTest, 0, skip)

for i := 0; i < skip+1; i++ {
searchWithOffset := search
for j := 0; j < i; j++ {
_, size := utf8.DecodeRuneInString(searchWithOffset)
// NB(owen-d): small bounds check for invalid utf8
searchWithOffset = searchWithOffset[min(size, len(searchWithOffset)):]
}

var test stringTest
it := b.Tokens(searchWithOffset)
for it.Next() {
ngram := make([]byte, len(it.At()))
copy(ngram, it.At())
test.ngrams = append(test.ngrams, ngram)
}
tests = append(tests, test)
}

res = tests[0]
for _, t := range tests[1:] {
res = newOrTest(res, t)
}
return res
}

// Matches implements the BloomTest interface
func (b stringTest) Matches(bloom filter.Checker) bool {
for _, ngram := range b.ngrams {
if !bloom.Test(ngram) {
return false
}
}
return true
}

// MatchesWithPrefixBuf implements the BloomTest interface
func (b stringTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
for _, ngram := range b.ngrams {
buf = append(buf[:prefixLen], ngram...)
if !bloom.Test(buf) {
return false
}
}
return true
}

type stringMatcherFilter struct {
test BloomTest
}

// Matches implements the log.Filterer interface
func (b stringMatcherFilter) Matches(test log.Checker) bool {
return b.test.Matches(logCheckerWrapper{test})
}

func newStringFilterFunc(b NGramBuilder) log.NewMatcherFiltererFunc {
return func(match []byte, _ bool) log.MatcherFilterer {
return log.WrapMatcher(stringMatcherFilter{
test: newStringTest(b, string(match)),
})
}
}

type orTest struct {
left, right BloomTest
}
Expand All @@ -286,10 +71,12 @@ func newOrTest(left, right BloomTest) orTest {
}
}

// Matches implements BloomTest
func (o orTest) Matches(bloom filter.Checker) bool {
return o.left.Matches(bloom) || o.right.Matches(bloom)
}

// MatchesWithPrefixBuf implements BloomTest
func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
}
Expand All @@ -305,28 +92,16 @@ func newAndTest(left, right BloomTest) andTest {
}
}

// Matches implements BloomTest
func (a andTest) Matches(bloom filter.Checker) bool {
return a.left.Matches(bloom) && a.right.Matches(bloom)
}

// MatchesWithPrefixBuf implements BloomTest
func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool {
return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen)
}

func newPatternTest(b NGramBuilder, match string) BloomTest {
lit, err := pattern.ParseLiterals(match)
if err != nil {
return MatchAll
}

var res BloomTests

for _, l := range lit {
res = append(res, newStringTest(b, string(l)))
}
return res
}

func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest {
tests := make(BloomTests, 0, len(matchers))
for _, matcher := range matchers {
Expand Down
Loading
Loading