From 7f6bf488ec0c4f6918d2f0e3b25915039ef20927 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 11:46:10 +0200 Subject: [PATCH 1/6] Remove n-gram length and n-gram skip Signed-off-by: Christian Haudum --- pkg/bloombuild/builder/builder.go | 4 +-- pkg/bloombuild/builder/config.go | 2 -- pkg/bloombuild/builder/spec.go | 2 -- pkg/bloombuild/builder/spec_test.go | 8 ++--- pkg/bloombuild/planner/planner_test.go | 2 +- pkg/storage/bloom/v1/bloom_tokenizer.go | 25 ++++---------- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 33 ++++--------------- pkg/storage/bloom/v1/builder.go | 8 ++--- pkg/storage/bloom/v1/builder_test.go | 24 +++++--------- pkg/storage/bloom/v1/fuse_test.go | 6 ++-- pkg/storage/bloom/v1/schema.go | 31 +++++++---------- pkg/storage/bloom/v1/test_util.go | 6 ++-- .../bloom/v1/versioned_builder_test.go | 6 ++-- pkg/validation/limits.go | 24 ++------------ 14 files changed, 51 insertions(+), 130 deletions(-) diff --git a/pkg/bloombuild/builder/builder.go b/pkg/bloombuild/builder/builder.go index 24710944f8e10..78932e3f3e9ec 100644 --- a/pkg/bloombuild/builder/builder.go +++ b/pkg/bloombuild/builder/builder.go @@ -340,11 +340,9 @@ func (b *Builder) processTask( var ( blockCt int - nGramSize = uint64(b.limits.BloomNGramLength(tenant)) - nGramSkip = uint64(b.limits.BloomNGramSkip(tenant)) maxBlockSize = uint64(b.limits.BloomMaxBlockSize(tenant)) maxBloomSize = uint64(b.limits.BloomMaxBloomSize(tenant)) - blockOpts = v1.NewBlockOptions(blockEnc, nGramSize, nGramSkip, maxBlockSize, maxBloomSize) + blockOpts = v1.NewBlockOptions(blockEnc, maxBlockSize, maxBloomSize) created []bloomshipper.Meta totalSeries int bytesAdded int diff --git a/pkg/bloombuild/builder/config.go b/pkg/bloombuild/builder/config.go index ddacfd884e10c..dcb44c55b5f31 100644 --- a/pkg/bloombuild/builder/config.go +++ b/pkg/bloombuild/builder/config.go @@ -38,8 +38,6 @@ func (cfg *Config) Validate() error { type Limits interface { BloomBlockEncoding(tenantID string) string - BloomNGramLength(tenantID string) int - BloomNGramSkip(tenantID string) int BloomMaxBlockSize(tenantID string) int BloomMaxBloomSize(tenantID string) int } diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index 82457cf92b84a..180c2fc32cb00 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -90,8 +90,6 @@ func NewSimpleBloomGenerator( reporter: reporter, tokenizer: v1.NewBloomTokenizer( - opts.Schema.NGramLen(), - opts.Schema.NGramSkip(), int(opts.UnencodedBlockOptions.MaxBloomSizeBytes), metrics, log.With( diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 62e3f70bf22f9..330c0552b657f 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -123,13 +123,13 @@ func TestSimpleBloomGenerator(t *testing.T) { }{ { desc: "SkipsIncompatibleSchemas", - fromSchema: v1.NewBlockOptions(enc, 3, 0, maxBlockSize, 0), - toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), + fromSchema: v1.NewBlockOptions(enc, maxBlockSize, 0), + toSchema: v1.NewBlockOptions(enc, maxBlockSize, 0), }, { desc: "CombinesBlocks", - fromSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), - toSchema: v1.NewBlockOptions(enc, 4, 0, maxBlockSize, 0), + fromSchema: v1.NewBlockOptions(enc, maxBlockSize, 0), + toSchema: v1.NewBlockOptions(enc, maxBlockSize, 0), }, } { t.Run(fmt.Sprintf("%s/%s", tc.desc, enc), func(t *testing.T) { diff --git a/pkg/bloombuild/planner/planner_test.go b/pkg/bloombuild/planner/planner_test.go index fbd3a7bac5305..482d277589c37 100644 --- a/pkg/bloombuild/planner/planner_test.go +++ b/pkg/bloombuild/planner/planner_test.go @@ -188,7 +188,7 @@ func genBlock(ref bloomshipper.BlockRef) (bloomshipper.Block, error) { writer := v1.NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := v1.NewByteReader(indexBuf, bloomsBuf) - blockOpts := v1.NewBlockOptions(compression.EncNone, 4, 1, 0, 0) + blockOpts := v1.NewBlockOptions(compression.EncNone, 0, 0) builder, err := v1.NewBlockBuilder(blockOpts, writer) if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index ec4edf4ac9369..3d04728937cf0 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -23,9 +23,8 @@ type BloomTokenizer struct { metrics *Metrics logger log.Logger - maxBloomSize int // size in bytes - lineTokenizer *NGramTokenizer - cache map[string]interface{} + maxBloomSize int // size in bytes + cache map[string]interface{} } const cacheSize = 150000 @@ -37,25 +36,15 @@ const eightBits = 8 // 1) The token slices generated must not be mutated externally // 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice. // 2) This is not thread safe. -func NewBloomTokenizer(nGramLen, nGramSkip int, maxBloomSize int, metrics *Metrics, logger log.Logger) *BloomTokenizer { - level.Info(logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip) +func NewBloomTokenizer(maxBloomSize int, metrics *Metrics, logger log.Logger) *BloomTokenizer { return &BloomTokenizer{ - metrics: metrics, - logger: logger, - cache: make(map[string]interface{}, cacheSize), - lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip), - maxBloomSize: maxBloomSize, + metrics: metrics, + logger: logger, + cache: make(map[string]interface{}, cacheSize), + maxBloomSize: maxBloomSize, } } -func (bt *BloomTokenizer) N() uint64 { - return uint64(bt.lineTokenizer.N()) -} - -func (bt *BloomTokenizer) SkipFactor() uint64 { - return uint64(bt.lineTokenizer.SkipFactor()) -} - // prefixedToken returns a byte slice with sufficient capacity for a chunk-ref prefixed token // of specific ngram length, along with the length of the prefix. // It ensures enough capacity for the prefix and the token so additional tokens can be created diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index b5145f5f93097..c2ae166ad67bf 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -28,11 +28,6 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -const ( - DefaultNGramLength = 4 - DefaultNGramSkip = 0 -) - var ( four = NewNGramTokenizer(4, 0) metrics = NewMetrics(prometheus.DefaultRegisterer) @@ -82,24 +77,10 @@ func TestPrefixedKeyCreation(t *testing.T) { } } -func TestSetLineTokenizer(t *testing.T) { - t.Parallel() - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) - - // Validate defaults - require.Equal(t, bt.lineTokenizer.N(), DefaultNGramLength) - require.Equal(t, bt.lineTokenizer.SkipFactor(), DefaultNGramSkip) - - // Set new tokenizer, and validate against that - bt.lineTokenizer = NewNGramTokenizer(6, 7) - require.Equal(t, bt.lineTokenizer.N(), 6) - require.Equal(t, bt.lineTokenizer.SkipFactor(), 7) -} - func TestTokenizerPopulate(t *testing.T) { t.Parallel() var testLine = "this is a log line" - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) metadata := push.LabelsAdapter{ {Name: "pod", Value: "loki-1"}, @@ -144,7 +125,7 @@ func TestTokenizerPopulate(t *testing.T) { func TestBloomTokenizerPopulateWithoutPreexistingBloom(t *testing.T) { var testLine = "this is a log line" - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) metadata := push.LabelsAdapter{ {Name: "pod", Value: "loki-1"}, @@ -221,7 +202,7 @@ func randomStr(ln int) string { func TestTokenizerPopulateWontExceedMaxSize(t *testing.T) { maxSize := 4 << 10 - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, maxSize, NewMetrics(nil), logger.NewNopLogger()) + bt := NewBloomTokenizer(maxSize, NewMetrics(nil), logger.NewNopLogger()) ch := make(chan *BloomCreation) metadata := make([]push.LabelsAdapter, 0, 4<<10) @@ -269,7 +250,7 @@ func populateAndConsumeBloom( func BenchmarkPopulateSeriesWithBloom(b *testing.B) { for i := 0; i < b.N; i++ { var testLine = lorem + lorem + lorem - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) @@ -302,7 +283,7 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { } func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, NewMetrics(nil), logger.NewNopLogger()) + bt := NewBloomTokenizer(0, NewMetrics(nil), logger.NewNopLogger()) md := push.LabelsAdapter{ {Name: "trace_id", Value: "3bef3c91643bde73"}, } @@ -340,7 +321,7 @@ func TestTokenizerClearsCacheBetweenPopulateCalls(t *testing.T) { } func BenchmarkMapClear(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k @@ -351,7 +332,7 @@ func BenchmarkMapClear(b *testing.B) { } func BenchmarkNewMap(b *testing.B) { - bt := NewBloomTokenizer(DefaultNGramLength, DefaultNGramSkip, 0, metrics, logger.NewNopLogger()) + bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) for i := 0; i < b.N; i++ { for k := 0; k < cacheSize; k++ { bt.cache[fmt.Sprint(k)] = k diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index 1e5278c24d31a..664eb60cd596f 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -66,12 +66,10 @@ func (b BlockOptions) Encode(enc *encoding.Encbuf) { enc.PutBE64(b.BlockSize) } -func NewBlockOptions(enc compression.Encoding, nGramLength, nGramSkip, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { +func NewBlockOptions(enc compression.Encoding, maxBlockSizeBytes, maxBloomSizeBytes uint64) BlockOptions { opts := NewBlockOptionsFromSchema(Schema{ - version: CurrentSchemaVersion, - encoding: enc, - nGramLength: nGramLength, - nGramSkip: nGramSkip, + version: CurrentSchemaVersion, + encoding: enc, }) opts.BlockSize = maxBlockSizeBytes opts.UnencodedBlockOptions.MaxBloomSizeBytes = maxBloomSizeBytes diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 3664b60d515f7..a2682921930f8 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -27,10 +27,8 @@ func TestBlockOptions_RoundTrip(t *testing.T) { t.Parallel() opts := BlockOptions{ Schema: Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncSnappy, - nGramLength: 10, - nGramSkip: 2, + version: CurrentSchemaVersion, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -87,10 +85,8 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { t.Run(desc, func(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ - version: CurrentSchemaVersion, - encoding: enc, - nGramLength: 10, - nGramSkip: 2, + version: CurrentSchemaVersion, + encoding: enc, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, @@ -398,10 +394,8 @@ func TestBlockReset(t *testing.T) { reader := NewByteReader(indexBuf, bloomsBuf) schema := Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncSnappy, - nGramLength: 10, - nGramSkip: 2, + version: CurrentSchemaVersion, + encoding: compression.EncSnappy, } builder, err := NewBlockBuilder( @@ -456,10 +450,8 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { blockOpts := BlockOptions{ Schema: Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncSnappy, // test with different encodings? - nGramLength: 4, // needs to match values from MkBasicSeriesWithBlooms - nGramSkip: 0, // needs to match values from MkBasicSeriesWithBlooms + version: CurrentSchemaVersion, + encoding: compression.EncSnappy, // test with different encodings? }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index befa5a7a9fa54..b246333021635 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -154,10 +154,8 @@ func TestFuseMultiPage(t *testing.T) { builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncSnappy, - nGramLength: 3, // we test trigrams - nGramSkip: 0, + version: CurrentSchemaVersion, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10, // So we force one bloom per page diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index dd532b61559f5..ae2f8fb6a251d 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -38,22 +38,19 @@ var ( ) type Schema struct { - version Version - encoding compression.Encoding - nGramLength, nGramSkip uint64 + version Version + encoding compression.Encoding } func NewSchema() Schema { return Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncNone, - nGramLength: 0, - nGramSkip: 0, + version: CurrentSchemaVersion, + encoding: compression.EncNone, } } func (s Schema) String() string { - return fmt.Sprintf("%s,encoding=%s,ngram=%d,skip=%d", s.version, s.encoding, s.nGramLength, s.nGramSkip) + return fmt.Sprintf("%s,encoding=%s", s.version, s.encoding) } func (s Schema) Compatible(other Schema) bool { @@ -64,14 +61,6 @@ func (s Schema) Version() Version { return s.version } -func (s Schema) NGramLen() int { - return int(s.nGramLength) -} - -func (s Schema) NGramSkip() int { - return int(s.nGramSkip) -} - // byte length func (s Schema) Len() int { // magic number + version + encoding + ngram length + ngram skip @@ -91,8 +80,9 @@ func (s *Schema) Encode(enc *encoding.Encbuf) { enc.PutBE32(magicNumber) enc.PutByte(byte(s.version)) enc.PutByte(byte(s.encoding)) - enc.PutBE64(s.nGramLength) - enc.PutBE64(s.nGramSkip) + // kept to keep compatibility + enc.PutBE64(0) // previously n-gram length + enc.PutBE64(0) // previously n-gram skip } @@ -123,8 +113,9 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { return errors.Wrap(err, "parsing encoding") } - s.nGramLength = dec.Be64() - s.nGramSkip = dec.Be64() + // kept to keep compatibility + _ = dec.Be64() // previously n-gram length + _ = dec.Be64() // previously n-gram skip return dec.Err() } diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index 6de5c85be4b8a..e8997a8cc2419 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -29,10 +29,8 @@ func MakeBlock(t testing.TB, nth int, fromFp, throughFp model.Fingerprint, fromT builder, err := NewBlockBuilder( BlockOptions{ Schema: Schema{ - version: CurrentSchemaVersion, - encoding: compression.EncSnappy, - nGramLength: 4, // see DefaultNGramLength in bloom_tokenizer_test.go - nGramSkip: 0, // see DefaultNGramSkip in bloom_tokenizer_test.go + version: CurrentSchemaVersion, + encoding: compression.EncSnappy, }, SeriesPageSize: 100, BloomPageSize: 10 << 10, diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 2ef08daad8939..07240fe603586 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -17,10 +17,8 @@ import ( func smallBlockOpts(v Version, enc compression.Encoding) BlockOptions { return BlockOptions{ Schema: Schema{ - version: v, - encoding: enc, - nGramLength: 4, - nGramSkip: 0, + version: v, + encoding: enc, }, SeriesPageSize: 100, BloomPageSize: 2 << 10, diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 58e93b3937d84..153073b74e6c2 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -207,12 +207,9 @@ type Limits struct { BloomBuildTaskMaxRetries int `yaml:"bloom_build_task_max_retries" json:"bloom_build_task_max_retries" category:"experimental"` BloomBuilderResponseTimeout time.Duration `yaml:"bloom_build_builder_response_timeout" json:"bloom_build_builder_response_timeout" category:"experimental"` - BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` - BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` - BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length" category:"experimental"` - BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip" category:"experimental"` - BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate" category:"experimental"` - BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"` + BloomCreationEnabled bool `yaml:"bloom_creation_enabled" json:"bloom_creation_enabled" category:"experimental"` + BloomSplitSeriesKeyspaceBy int `yaml:"bloom_split_series_keyspace_by" json:"bloom_split_series_keyspace_by" category:"experimental"` + BloomBlockEncoding string `yaml:"bloom_block_encoding" json:"bloom_block_encoding" category:"experimental"` BloomMaxBlockSize flagext.ByteSize `yaml:"bloom_max_block_size" json:"bloom_max_block_size" category:"experimental"` BloomMaxBloomSize flagext.ByteSize `yaml:"bloom_max_bloom_size" json:"bloom_max_bloom_size" category:"experimental"` @@ -379,9 +376,6 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.BloomGatewayEnabled, "bloom-gateway.enable-filtering", false, "Experimental. Whether to use the bloom gateway component in the read path to filter chunks.") f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Experimental. Interval for computing the cache key in the Bloom Gateway.") - f.IntVar(&l.BloomNGramLength, "bloom-build.ngram-length", 4, "Experimental. Length of the n-grams created when computing blooms from log lines.") - f.IntVar(&l.BloomNGramSkip, "bloom-build.ngram-skip", 1, "Experimental. Skip factor for the n-grams created when computing blooms from log lines.") - f.Float64Var(&l.BloomFalsePositiveRate, "bloom-build.false-positive-rate", 0.01, "Experimental. Scalable Bloom Filter desired false-positive rate.") f.StringVar(&l.BloomBlockEncoding, "bloom-build.block-encoding", "none", "Experimental. Compression algorithm for bloom block pages.") _ = l.BloomMaxBlockSize.Set(defaultBloomBuildMaxBlockSize) @@ -1010,14 +1004,6 @@ func (o *Overrides) BloomTaskMaxRetries(userID string) int { return o.getOverridesForUser(userID).BloomBuildTaskMaxRetries } -func (o *Overrides) BloomNGramLength(userID string) int { - return o.getOverridesForUser(userID).BloomNGramLength -} - -func (o *Overrides) BloomNGramSkip(userID string) int { - return o.getOverridesForUser(userID).BloomNGramSkip -} - func (o *Overrides) BloomMaxBlockSize(userID string) int { return o.getOverridesForUser(userID).BloomMaxBlockSize.Val() } @@ -1026,10 +1012,6 @@ func (o *Overrides) BloomMaxBloomSize(userID string) int { return o.getOverridesForUser(userID).BloomMaxBloomSize.Val() } -func (o *Overrides) BloomFalsePositiveRate(userID string) float64 { - return o.getOverridesForUser(userID).BloomFalsePositiveRate -} - func (o *Overrides) BloomBlockEncoding(userID string) string { return o.getOverridesForUser(userID).BloomBlockEncoding } From 05e5d45c50e5ebcec78994a8c24d9df8e8ca85b3 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 14:03:44 +0200 Subject: [PATCH 2/6] Remove ngram bloom tester Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_tester.go | 237 +--------------------- pkg/storage/bloom/v1/bloom_tester_test.go | 112 +--------- pkg/storage/bloom/v1/fuse_test.go | 52 ++--- 3 files changed, 28 insertions(+), 373 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tester.go b/pkg/storage/bloom/v1/bloom_tester.go index 760c3dfac27f4..6fe00a4cd1730 100644 --- a/pkg/storage/bloom/v1/bloom_tester.go +++ b/pkg/storage/bloom/v1/bloom_tester.go @@ -2,15 +2,8 @@ package v1 import ( "fmt" - "unicode/utf8" "unsafe" - "github.com/grafana/regexp" - - iter "github.com/grafana/loki/v3/pkg/iter/v2" - "github.com/grafana/loki/v3/pkg/logql/log" - "github.com/grafana/loki/v3/pkg/logql/log/pattern" - "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" ) @@ -39,228 +32,20 @@ func (b BloomTests) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefi return true } -// ExtractTestableLineFilters extracts all line filters from an expression -// that can be tested against a bloom filter. This will skip any line filters -// after a line format expression. A line format expression might add content -// that the query later matches against, which can't be tested with a bloom filter. -// E.g. For {app="fake"} |= "foo" | line_format "thisNewTextShouldMatch" |= "thisNewTextShouldMatch" -// this function will return only the line filter for "foo" since the line filter for "thisNewTextShouldMatch" -// wouldn't match against the bloom filter but should match against the query. -func ExtractTestableLineFilters(expr syntax.Expr) []syntax.LineFilterExpr { - if expr == nil { - return nil - } - - var filters []syntax.LineFilterExpr - var lineFmtFound bool - visitor := &syntax.DepthFirstTraversal{ - VisitLineFilterFn: func(_ syntax.RootVisitor, e *syntax.LineFilterExpr) { - if e != nil && !lineFmtFound { - filters = append(filters, *e) - } - }, - VisitLineFmtFn: func(_ syntax.RootVisitor, e *syntax.LineFmtExpr) { - if e != nil { - lineFmtFound = true - } - }, - } - expr.Accept(visitor) - return filters -} - -// FiltersToBloomTest converts a list of line filters to a BloomTest. -// Note that all the line filters should be testable against a bloom filter. -// Use ExtractTestableLineFilters to extract testable line filters from an expression. -// TODO(owen-d): limits the number of bloom lookups run. -// An arbitrarily high number can overconsume cpu and is a DoS vector. -// TODO(owen-d): use for loop not recursion to protect callstack -func FiltersToBloomTest(b NGramBuilder, filters ...syntax.LineFilterExpr) BloomTest { - tests := make(BloomTests, 0, len(filters)) - for _, f := range filters { - if f.Left != nil { - tests = append(tests, FiltersToBloomTest(b, *f.Left)) - } - if f.Or != nil { - left := FiltersToBloomTest(b, *f.Or) - right := simpleFilterToBloomTest(b, f.LineFilter) - tests = append(tests, newOrTest(left, right)) - continue - } - - tests = append(tests, simpleFilterToBloomTest(b, f.LineFilter)) - } - return tests -} - -func simpleFilterToBloomTest(b NGramBuilder, filter syntax.LineFilter) BloomTest { - switch filter.Ty { - case log.LineMatchNotEqual, log.LineMatchNotRegexp, log.LineMatchNotPattern: - // We cannot test _negated_ filters with a bloom filter since blooms are probabilistic - // filters that can only tell us if a string _might_ exist. - // For example, for `!= "foo"`, the bloom filter might tell us that the string "foo" might exist - // but because we are not sure, we cannot discard that chunk because it might actually not be there. - // Therefore, we return a test that always returns true. - return MatchAll - case log.LineMatchEqual: - return newStringTest(b, filter.Match) - case log.LineMatchRegexp: - return MatchAll - case log.LineMatchPattern: - return newPatternTest(b, filter.Match) - default: - return MatchAll - } -} - -type bloomCheckerWrapper struct { - bloom filter.Checker -} - -// Test implements the log.Checker interface -func (b bloomCheckerWrapper) Test(line []byte, _ bool, _ bool) bool { - return b.bloom.Test(line) -} - -// TestRegex implements the log.Checker interface -func (b bloomCheckerWrapper) TestRegex(_ *regexp.Regexp) bool { - // We won't support regexes in bloom filters so we just return true - return true -} - -type logCheckerWrapper struct { - checker log.Checker -} - -// Test implements the filter.Checker interface -func (l logCheckerWrapper) Test(data []byte) bool { - return l.checker.Test(data, true, false) -} - -type matcherFilterWrapper struct { - filter log.Matcher -} - -func (m matcherFilterWrapper) Matches(bloom filter.Checker) bool { - return m.filter.Matches(bloomCheckerWrapper{bloom}) -} - -func (m matcherFilterWrapper) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - return m.filter.Matches(bloomCheckerWrapper{prefixedChecker{ - checker: bloom, - buf: buf, - prefixLen: prefixLen, - }}) -} - -type prefixedChecker struct { - checker filter.Checker - buf []byte - prefixLen int -} - -func (p prefixedChecker) Test(data []byte) bool { - return p.checker.Test(append(p.buf[:p.prefixLen], data...)) -} - type matchAllTest struct{} var MatchAll = matchAllTest{} +// Matches implements BloomTest func (n matchAllTest) Matches(_ filter.Checker) bool { return true } +// MatchesWithPrefixBuf implements BloomTest func (n matchAllTest) MatchesWithPrefixBuf(_ filter.Checker, _ []byte, _ int) bool { return true } -// NGramBuilder is an interface for tokenizing strings into ngrams -// Extracting this interface allows us to test the bloom filter without having to use the actual tokenizer -// TODO: This should be moved to tokenizer.go -type NGramBuilder interface { - Tokens(line string) iter.Iterator[[]byte] - N() int - SkipFactor() int -} - -type stringTest struct { - ngrams [][]byte -} - -func newStringTest(b NGramBuilder, search string) (res BloomTest) { - // search string must be longer than the combined ngram length and skip factor - // in order for all possible skip offsets to have at least 1 ngram - skip := b.SkipFactor() - if ct := utf8.RuneCountInString(search); ct < b.N()+skip { - return MatchAll - } - - tests := make([]stringTest, 0, skip) - - for i := 0; i < skip+1; i++ { - searchWithOffset := search - for j := 0; j < i; j++ { - _, size := utf8.DecodeRuneInString(searchWithOffset) - // NB(owen-d): small bounds check for invalid utf8 - searchWithOffset = searchWithOffset[min(size, len(searchWithOffset)):] - } - - var test stringTest - it := b.Tokens(searchWithOffset) - for it.Next() { - ngram := make([]byte, len(it.At())) - copy(ngram, it.At()) - test.ngrams = append(test.ngrams, ngram) - } - tests = append(tests, test) - } - - res = tests[0] - for _, t := range tests[1:] { - res = newOrTest(res, t) - } - return res -} - -// Matches implements the BloomTest interface -func (b stringTest) Matches(bloom filter.Checker) bool { - for _, ngram := range b.ngrams { - if !bloom.Test(ngram) { - return false - } - } - return true -} - -// MatchesWithPrefixBuf implements the BloomTest interface -func (b stringTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { - for _, ngram := range b.ngrams { - buf = append(buf[:prefixLen], ngram...) - if !bloom.Test(buf) { - return false - } - } - return true -} - -type stringMatcherFilter struct { - test BloomTest -} - -// Matches implements the log.Filterer interface -func (b stringMatcherFilter) Matches(test log.Checker) bool { - return b.test.Matches(logCheckerWrapper{test}) -} - -func newStringFilterFunc(b NGramBuilder) log.NewMatcherFiltererFunc { - return func(match []byte, _ bool) log.MatcherFilterer { - return log.WrapMatcher(stringMatcherFilter{ - test: newStringTest(b, string(match)), - }) - } -} - type orTest struct { left, right BloomTest } @@ -286,10 +71,12 @@ func newOrTest(left, right BloomTest) orTest { } } +// Matches implements BloomTest func (o orTest) Matches(bloom filter.Checker) bool { return o.left.Matches(bloom) || o.right.Matches(bloom) } +// MatchesWithPrefixBuf implements BloomTest func (o orTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { return o.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) || o.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) } @@ -305,28 +92,16 @@ func newAndTest(left, right BloomTest) andTest { } } +// Matches implements BloomTest func (a andTest) Matches(bloom filter.Checker) bool { return a.left.Matches(bloom) && a.right.Matches(bloom) } +// MatchesWithPrefixBuf implements BloomTest func (a andTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { return a.left.MatchesWithPrefixBuf(bloom, buf, prefixLen) && a.right.MatchesWithPrefixBuf(bloom, buf, prefixLen) } -func newPatternTest(b NGramBuilder, match string) BloomTest { - lit, err := pattern.ParseLiterals(match) - if err != nil { - return MatchAll - } - - var res BloomTests - - for _, l := range lit { - res = append(res, newStringTest(b, string(l))) - } - return res -} - func LabelMatchersToBloomTest(matchers ...LabelMatcher) BloomTest { tests := make(BloomTests, 0, len(matchers)) for _, matcher := range matchers { diff --git a/pkg/storage/bloom/v1/bloom_tester_test.go b/pkg/storage/bloom/v1/bloom_tester_test.go index 8c400947ad702..7a314872cc86e 100644 --- a/pkg/storage/bloom/v1/bloom_tester_test.go +++ b/pkg/storage/bloom/v1/bloom_tester_test.go @@ -10,116 +10,6 @@ import ( "github.com/grafana/loki/pkg/push" ) -type fakeLineBloom []string - -// fakeBloom is a fake bloom filter that matches tokens exactly. -// It uses a tokenizer to build the tokens for a line -func newFakeBloom(tokenizer *NGramTokenizer, line string) (res fakeLineBloom) { - toks := tokenizer.Tokens(line) - for toks.Next() { - res = append(res, string(toks.At())) - } - return -} - -func (f fakeLineBloom) Test(data []byte) bool { - str := string(data) - for _, match := range f { - if str == match { - return true - } - } - return false -} - -func TestBloomQueryingLogic(t *testing.T) { - // All tested on 4skip1 - n := 4 - skip := 1 - tokenizer := NewNGramTokenizer(n, skip) - - for _, tc := range []struct { - desc string - line string - query string - match bool - enabled bool - }{ - { - desc: "filter too short always match", - line: "foobar", - query: `{app="fake"} |= "z"`, - match: true, - }, - { - desc: "simple matcher", - line: "foobar", - query: `{app="fake"} |= "oobar"`, - match: true, - }, - { - desc: "longer sequence", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |= "nopqrstuvwxyz"`, - match: true, - }, - { - desc: "longer sequence nomatch", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |= "nopqrstuvwxyzzz"`, - match: false, - }, - { - desc: "pattern simple", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>lmnopq<_>"`, - match: true, - }, - { - desc: "pattern too short matches", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>zzz<_>"`, - match: true, - }, - { - desc: "pattern mix long success and short", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>lmnop<_>zzz<_>"`, - match: true, - }, - { - desc: "pattern mix long fail and short", - line: "abcdefghijklmnopqrstuvwxyz", - query: `{app="fake"} |> "<_>zzzzz<_>zzz<_>"`, - match: false, - }, - { - desc: "regexp disabled", - line: "foobarbaz", - query: `{app="fake"} |~ "(aaaaa|bbbbb)bazz"`, - match: true, - }, - } { - - // shortcut to enable specific tests - tc.enabled = true - if !tc.enabled { - continue - } - t.Run(tc.desc, func(t *testing.T) { - bloom := newFakeBloom(tokenizer, tc.line) - expr, err := syntax.ParseExpr(tc.query) - require.NoError(t, err) - filters := ExtractTestableLineFilters(expr) - bloomTests := FiltersToBloomTest(tokenizer, filters...) - matched := bloomTests.Matches(bloom) - - require.Equal(t, tc.match, matched) - - }) - } -} - func TestLabelMatchersToBloomTest(t *testing.T) { // All test cases below have access to a fake bloom filter with // trace_id=exists_1 and trace_id=exists_2 @@ -219,7 +109,7 @@ type fakeMetadataBloom []string // fakeBloom is a fake bloom filter that matches tokens exactly. // It uses a tokenizer to build the tokens for a line -func newFakeMetadataBloom(tokenizer *StructuredMetadataTokenizer, kvs ...push.LabelAdapter) (res fakeLineBloom) { +func newFakeMetadataBloom(tokenizer *StructuredMetadataTokenizer, kvs ...push.LabelAdapter) (res fakeMetadataBloom) { for _, kv := range kvs { it := tokenizer.Tokens(kv) for it.Next() { diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index b246333021635..2b28256629d48 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "math" "sync" "testing" @@ -25,26 +24,26 @@ var BloomPagePool = mempool.New("test", []mempool.Bucket{ {Size: 16, Capacity: 512 << 10}, }, nil) -// TODO(owen-d): this is unhinged from the data it represents. I'm leaving this solely so I don't -// have to refactor tests here in order to fix this elsewhere, but it can/should be fixed -- -// the skip & n len are hardcoded based on data that's passed to it elsewhere. -// TODO(chaudum): Can be removed once matching with structured metadata is implemented. -type fakeNgramBuilder struct{} +type singleKeyTest []byte -func (f fakeNgramBuilder) N() int { return math.MaxInt } // do not tokenize -func (f fakeNgramBuilder) SkipFactor() int { return 0 } +// Matches implements BloomTest. +func (s singleKeyTest) Matches(bloom filter.Checker) bool { + return bloom.Test(s) +} -func (f fakeNgramBuilder) Tokens(key string) v2.Iterator[[]byte] { - return v2.NewSliceIter[[]byte]([][]byte{[]byte(key)}) +// MatchesWithPrefixBuf implements BloomTest. +func (s singleKeyTest) MatchesWithPrefixBuf(bloom filter.Checker, buf []byte, prefixLen int) bool { + return bloom.Test(append(buf[:prefixLen], s...)) } +// compiler check +var _ BloomTest = singleKeyTest("") + func keysToBloomTest(keys [][]byte) BloomTest { - var tokenizer fakeNgramBuilder tests := make(BloomTests, 0, len(keys)) for _, key := range keys { - tests = append(tests, newStringTest(tokenizer, string(key))) + tests = append(tests, singleKeyTest(key)) } - return tests } @@ -55,7 +54,7 @@ func TestFusedQuerier(t *testing.T) { writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) reader := NewByteReader(indexBuf, bloomsBuf) numSeries := 1000 - data, keys := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) + data, _ := MkBasicSeriesWithBlooms(numSeries, 0x0000, 0xffff, 0, 10000) builder, err := NewBlockBuilder( BlockOptions{ @@ -91,7 +90,7 @@ func TestFusedQuerier(t *testing.T) { Fp: data[idx].Series.Fingerprint, Chks: data[idx].Series.Chunks, Response: ch, - Search: keysToBloomTest(keys[idx]), + Search: singleKeyTest("trace_id"), }) } inputs = append(inputs, reqs) @@ -132,20 +131,13 @@ func TestFusedQuerier(t *testing.T) { for i, input := range inputs { for j, req := range input { resp := resps[i][j] - require.Equal( - t, - Output{ - Fp: req.Fp, - Removals: nil, - }, - resp, - ) + require.Equal(t, Output{Fp: req.Fp, Removals: nil}, resp) } } } // Successfully query series across multiple pages as well as series that only occupy 1 bloom -func TestFuseMultiPage(t *testing.T) { +func TestFusedQuerier_MultiPage(t *testing.T) { indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -215,13 +207,11 @@ func TestFuseMultiPage(t *testing.T) { chans[i] = make(chan Output, 1) // buffered once to not block in test } - req := func(ngram []byte, ch chan Output) Request { + req := func(key []byte, ch chan Output) Request { return Request{ - Fp: fp, - Chks: []ChunkRef{chk}, - Search: stringTest{ - ngrams: [][]byte{ngram}, - }, + Fp: fp, + Chks: []ChunkRef{chk}, + Search: singleKeyTest(key), Response: ch, Recorder: NewBloomRecorder(context.Background(), "unknown"), } @@ -357,7 +347,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { } } -func TestFusedQuerierSkipsEmptyBlooms(t *testing.T) { +func TestFusedQuerier_SkipsEmptyBlooms(t *testing.T) { // references for linking in memory reader+writer indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) From b53b846359b6d26e3687958ef867eb5e9cc3915c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 14:21:17 +0200 Subject: [PATCH 3/6] Remove ngram tokenizer Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/bloom_tokenizer.go | 22 -- pkg/storage/bloom/v1/bloom_tokenizer_test.go | 57 +---- pkg/storage/bloom/v1/fuse_test.go | 16 +- pkg/storage/bloom/v1/tokenizer.go | 121 ---------- pkg/storage/bloom/v1/tokenizer_test.go | 225 ------------------- 5 files changed, 12 insertions(+), 429 deletions(-) diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 3d04728937cf0..939c91c214398 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -45,28 +45,6 @@ func NewBloomTokenizer(maxBloomSize int, metrics *Metrics, logger log.Logger) *B } } -// prefixedToken returns a byte slice with sufficient capacity for a chunk-ref prefixed token -// of specific ngram length, along with the length of the prefix. -// It ensures enough capacity for the prefix and the token so additional tokens can be created -// without allocations by appending them to the prefix length -// If the buffer is nil or too small, a new one is created. The buffer is returned for reuse. -func prefixedToken(ngram int, chk ChunkRef, buf []byte) ([]byte, int) { - enc := encoding.EncWith(buf) - enc.Reset() - enc.PutBE64(uint64(chk.From)) - enc.PutBE64(uint64(chk.Through)) - enc.PutBE32(chk.Checksum) - prefixLn := enc.Len() // record the length of the prefix - - // If the buffer is too small, ensure enough capacity for the ngram - if cap(enc.Get()) < prefixLn+ngram*MaxRuneLen { - enc.PutBytes(make([]byte, ngram*MaxRuneLen)) - } - - // return the underlying byte slice and the length of the prefix - return enc.Get(), prefixLn -} - // ChunkRefWithIter is a wrapper around a ChunkRef and an EntryIterator. type ChunkRefWithIter struct { Ref ChunkRef diff --git a/pkg/storage/bloom/v1/bloom_tokenizer_test.go b/pkg/storage/bloom/v1/bloom_tokenizer_test.go index c2ae166ad67bf..79eb74033dd74 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer_test.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer_test.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/loki/pkg/push" - "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/storage/bloom/v1/filter" @@ -28,54 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" ) -var ( - four = NewNGramTokenizer(4, 0) - metrics = NewMetrics(prometheus.DefaultRegisterer) -) - -func TestPrefixedKeyCreation(t *testing.T) { - t.Parallel() - var ones uint64 = 0xffffffffffffffff - - ref := ChunkRef{ - From: 0, - Through: model.Time(int64(ones)), - Checksum: 0xffffffff, - } - for _, tc := range []struct { - desc string - ngram, expLen int - }{ - { - desc: "0-gram", - ngram: 0, - expLen: 20, - }, - { - desc: "4-gram", - ngram: 4, - expLen: 20 + 4*MaxRuneLen, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - token, prefixLn := prefixedToken(tc.ngram, ref, nil) - require.Equal(t, 20, prefixLn) - require.Equal(t, tc.expLen, len(token)) - // first 8 bytes should be zeros from `from` - for i := 0; i < 8; i++ { - require.Equal(t, byte(0), token[i]) - } - // next 8 bytes should be ones from `through` - for i := 8; i < 16; i++ { - require.Equal(t, byte(255), token[i]) - } - // next 4 bytes should be ones from `checksum` - for i := 16; i < 20; i++ { - require.Equal(t, byte(255), token[i]) - } - }) - } -} +var metrics = NewMetrics(prometheus.DefaultRegisterer) func TestTokenizerPopulate(t *testing.T) { t.Parallel() @@ -249,7 +201,6 @@ func populateAndConsumeBloom( func BenchmarkPopulateSeriesWithBloom(b *testing.B) { for i := 0; i < b.N; i++ { - var testLine = lorem + lorem + lorem bt := NewBloomTokenizer(0, metrics, logger.NewNopLogger()) sbf := filter.NewScalableBloomFilter(1024, 0.01, 0.8) @@ -257,7 +208,11 @@ func BenchmarkPopulateSeriesWithBloom(b *testing.B) { memChunk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, compression.EncSnappy, chunkenc.ChunkHeadFormatFor(chunkenc.ChunkFormatV4), 256000, 1500000) _, _ = memChunk.Append(&push.Entry{ Timestamp: time.Unix(0, 1), - Line: testLine, + Line: "", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{Name: "trace_id", Value: fmt.Sprintf("%04x", i)}, + push.LabelAdapter{Name: "org_id", Value: fmt.Sprintf("%d", i%1000)}, + }, }) itr, err := memChunk.Iterator( context.Background(), diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index 2b28256629d48..ec4f575fc22a8 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -167,20 +167,20 @@ func TestFusedQuerier_MultiPage(t *testing.T) { Chunks: []ChunkRef{chk}, } - buf, prefixLn := prefixedToken(3, chk, nil) + buf := prefixForChunkRef(chk) b1 := &Bloom{ *filter.NewScalableBloomFilter(1024, 0.01, 0.8), } key1, key2 := []byte("foo"), []byte("bar") b1.Add(key1) - b1.Add(append(buf[:prefixLn], key1...)) + b1.Add(append(buf, key1...)) b2 := &Bloom{ *filter.NewScalableBloomFilter(1024, 0.01, 0.8), } b2.Add(key2) - b2.Add(append(buf[:prefixLn], key2...)) + b2.Add(append(buf, key2...)) _, err = builder.BuildFrom(v2.NewSliceIter([]SeriesWithBlooms{ { @@ -257,7 +257,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { numSeries := 4 data := make([]SeriesWithBlooms, 0, numSeries) - tokenizer := NewNGramTokenizer(4, 0) + for i := 0; i < numSeries; i++ { var series Series series.Fingerprint = model.Fingerprint(i) @@ -280,12 +280,8 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { } for j := 0; j < nLines; j++ { - line := fmt.Sprintf("%04x:%04x", i, j) - it := tokenizer.Tokens(line) - for it.Next() { - key := it.At() - bloom.Add(key) - } + key := fmt.Sprintf("%04x:%04x", i, j) + bloom.Add([]byte(key)) } data = append(data, SeriesWithBlooms{ diff --git a/pkg/storage/bloom/v1/tokenizer.go b/pkg/storage/bloom/v1/tokenizer.go index 5cbf199448f68..59654747832d8 100644 --- a/pkg/storage/bloom/v1/tokenizer.go +++ b/pkg/storage/bloom/v1/tokenizer.go @@ -2,17 +2,12 @@ package v1 import ( "fmt" - "unicode/utf8" iter "github.com/grafana/loki/v3/pkg/iter/v2" "github.com/grafana/loki/pkg/push" ) -const ( - MaxRuneLen = 4 -) - type StructuredMetadataTokenizer struct { // prefix to add to tokens, typically the encoded chunkref prefix string @@ -26,7 +21,6 @@ func NewStructuredMetadataTokenizer(prefix string) *StructuredMetadataTokenizer } } -// Tokens implements the NGramBuilder interface func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator[string] { combined := fmt.Sprintf("%s=%s", kv.Name, kv.Value) t.tokens = append(t.tokens[:0], @@ -36,118 +30,3 @@ func (t *StructuredMetadataTokenizer) Tokens(kv push.LabelAdapter) iter.Iterator ) return iter.NewSliceIter(t.tokens) } - -func reassemble(buf []rune, ln, pos int, result []byte) []byte { - result = result[:0] // Reset the result slice - for i := 0; i < ln; i++ { - cur := pos % len(buf) - pos++ - result = utf8.AppendRune(result, buf[cur]) - } - return result -} - -// Iterable variants (more performant, less space) -type NGramTokenizer struct { - n, skip int - buffer []rune // circular buffer used for ngram generation - res []byte // buffer used for token generation -} - -func (t *NGramTokenizer) N() int { - return t.n -} - -func (t *NGramTokenizer) SkipFactor() int { - return t.skip -} - -/* -N-Grams (https://en.wikipedia.org/wiki/N-gram) are a series of 'n' adjacent characters in a string. -These will be utilized for the bloom filters to allow for fuzzy searching. -*/ -func NewNGramTokenizer(n, skip int) *NGramTokenizer { - t := &NGramTokenizer{ - n: n, - skip: skip, - buffer: make([]rune, n+skip), - res: make([]byte, 0, n*MaxRuneLen), // maximum 4 bytes per rune - } - - return t -} - -// Token implements the NGramBuilder interface -// The Token iterator uses shared buffers for performance. The []byte returned by At() -// is not safe for use after subsequent calls to Next() -func (t *NGramTokenizer) Tokens(line string) iter.Iterator[[]byte] { - return &NGramTokenIter{ - n: t.N(), - skip: t.SkipFactor(), - - line: line, - - buffer: t.buffer, - res: t.res, - } -} - -type NGramTokenIter struct { - n, skip int - - runeIndex, offset int - line string // source - - buffer []rune // circular buffers used for ngram generation - res []byte -} - -func (t *NGramTokenIter) Next() bool { - for i, r := range t.line[t.offset:] { - t.buffer[t.runeIndex%len(t.buffer)] = r - t.runeIndex++ - - if t.runeIndex < t.n { - continue - } - - // if the start of the ngram is at the interval of our skip factor, emit it. - // we increment the skip due to modulo logic: - // because `n % 0 is a divide by zero and n % 1 is always 0` - if (t.runeIndex-t.n)%(t.skip+1) == 0 { - // update the offset, but don't go past the end of the line; - // for instance invalid utf-8 - t.offset = min(len(t.line), t.offset+i+utf8.RuneLen(r)) - return true - } - - } - return false -} - -func (t *NGramTokenIter) At() []byte { - return reassemble(t.buffer, t.n, (t.runeIndex-t.n)%len(t.buffer), t.res[:0]) -} - -func (t *NGramTokenIter) Err() error { - return nil -} - -type PrefixedTokenIter struct { - buf []byte - prefixLen int - - iter.Iterator[[]byte] -} - -func (t *PrefixedTokenIter) At() []byte { - return append(t.buf[:t.prefixLen], t.Iterator.At()...) -} - -func NewPrefixedTokenIter(buf []byte, prefixLn int, itr iter.Iterator[[]byte]) *PrefixedTokenIter { - return &PrefixedTokenIter{ - buf: buf, - prefixLen: prefixLn, - Iterator: itr, - } -} diff --git a/pkg/storage/bloom/v1/tokenizer_test.go b/pkg/storage/bloom/v1/tokenizer_test.go index f21aceca06402..0aeb0ba1f5510 100644 --- a/pkg/storage/bloom/v1/tokenizer_test.go +++ b/pkg/storage/bloom/v1/tokenizer_test.go @@ -2,7 +2,6 @@ package v1 import ( "testing" - "unicode/utf8" "github.com/stretchr/testify/require" @@ -11,230 +10,6 @@ import ( "github.com/grafana/loki/pkg/push" ) -const BigFile = "../../../logql/sketch/testdata/war_peace.txt" - -func TestNGramIterator(t *testing.T) { - t.Parallel() - var ( - three = NewNGramTokenizer(3, 0) - threeSkip1 = NewNGramTokenizer(3, 1) - threeSkip3 = NewNGramTokenizer(3, 3) - ) - - for _, tc := range []struct { - desc string - t *NGramTokenizer - input string - exp []string - }{ - { - t: three, - input: "", - exp: []string{}, - }, - { - t: three, - input: "ab", - exp: []string{}, - }, - { - t: three, - input: "abcdefg", - exp: []string{"abc", "bcd", "cde", "def", "efg"}, - }, - { - t: threeSkip1, - input: "abcdefg", - exp: []string{"abc", "cde", "efg"}, - }, - { - t: threeSkip3, - input: "abcdefgh", - exp: []string{"abc", "efg"}, - }, - { - t: three, - input: "日本語", - exp: []string{"日本語"}, - }, - { - t: four, - input: "日本語日本語", - exp: []string{ - "日本語日", - "本語日本", - "語日本語"}, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - itr := tc.t.Tokens(tc.input) - for _, exp := range tc.exp { - require.True(t, itr.Next()) - require.Equal(t, exp, string(itr.At())) - } - require.False(t, itr.Next()) - }) - } -} - -// Mainly this ensures we don't panic when a string ends in invalid utf8 -func TestInvalidUTF8(t *testing.T) { - x := NewNGramTokenizer(3, 0) - - input := "abc\x80" - require.False(t, utf8.ValidString(input)) - itr := x.Tokens(input) - require.True(t, itr.Next()) - require.Equal(t, []byte("abc"), itr.At()) - require.True(t, itr.Next()) - // we don't really care about the final rune returned and it's probably not worth the perf cost - // to check for it - require.Equal(t, []byte{0x62, 0x63, 0xef, 0xbf, 0xbd}, itr.At()) - require.False(t, itr.Next()) -} - -func TestPrefixedIterator(t *testing.T) { - t.Parallel() - var ( - three = NewNGramTokenizer(3, 0) - ) - - for _, tc := range []struct { - desc string - input string - exp []string - }{ - { - input: "", - exp: []string{}, - }, - { - input: "ab", - exp: []string{}, - }, - { - input: "abcdefg", - exp: []string{"0123abc", "0123bcd", "0123cde", "0123def", "0123efg"}, - }, - - { - input: "日本語", - exp: []string{"0123日本語"}, - }, - } { - prefix := []byte("0123") - t.Run(tc.desc, func(t *testing.T) { - itr := NewPrefixedTokenIter(prefix, len(prefix), three.Tokens(tc.input)) - for _, exp := range tc.exp { - require.True(t, itr.Next()) - require.Equal(t, exp, string(itr.At())) - } - require.False(t, itr.Next()) - }) - } -} - -const lorem = ` -lorum ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore magna -aliqua ut enim ad minim veniam quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat -duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur excepteur -sint occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim id est -laborum ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod tempor incididunt ut labore et dolore magna -aliqua ut enim ad minim veniam quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat -duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur excepteur -sint occaecat cupidatat non proident sunt in culpa qui officia deserunt mollit anim id est -` - -func BenchmarkTokens(b *testing.B) { - var ( - v2Three = NewNGramTokenizer(3, 0) - v2ThreeSkip1 = NewNGramTokenizer(3, 1) - ) - - type impl struct { - desc string - f func() - } - type tc struct { - desc string - impls []impl - } - for _, tc := range []tc{ - { - desc: "three", - impls: []impl{ - { - desc: "v2", - f: func() { - itr := v2Three.Tokens(lorem) - for itr.Next() { - _ = itr.At() - } - }, - }, - }, - }, - { - desc: "threeSkip1", - impls: []impl{ - { - desc: "v2", - f: func() { - itr := v2ThreeSkip1.Tokens(lorem) - for itr.Next() { - _ = itr.At() - } - }, - }, - }, - }, - { - desc: "threeChunk", - impls: []impl{ - { - desc: "v2", - f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N(), ChunkRef{}, nil) - return func() { - itr := NewPrefixedTokenIter(buf, prefixLn, v2Three.Tokens(lorem)) - for itr.Next() { - _ = itr.At() - } - } - }(), - }, - }, - }, - { - desc: "threeSkip1Chunk", - impls: []impl{ - { - desc: "v2", - f: func() func() { - buf, prefixLn := prefixedToken(v2Three.N(), ChunkRef{}, nil) - return func() { - itr := NewPrefixedTokenIter(buf, prefixLn, v2ThreeSkip1.Tokens(lorem)) - for itr.Next() { - _ = itr.At() - } - } - }(), - }, - }, - }, - } { - b.Run(tc.desc, func(b *testing.B) { - for _, impl := range tc.impls { - b.Run(impl.desc, func(b *testing.B) { - for i := 0; i < b.N; i++ { - impl.f() - } - }) - } - }) - } -} - func TestStructuredMetadataTokenizer(t *testing.T) { tokenizer := NewStructuredMetadataTokenizer("chunk") From a8b465f266fbf7a7e56bc70f4c778305b4ad5d44 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 15:38:43 +0200 Subject: [PATCH 4/6] Remove unused spec.go file Signed-off-by: Christian Haudum --- pkg/storage/bloom/spec.go | 29 ----------------------------- 1 file changed, 29 deletions(-) delete mode 100644 pkg/storage/bloom/spec.go diff --git a/pkg/storage/bloom/spec.go b/pkg/storage/bloom/spec.go deleted file mode 100644 index 19f5940ddfbc2..0000000000000 --- a/pkg/storage/bloom/spec.go +++ /dev/null @@ -1,29 +0,0 @@ -package bloom - -import "github.com/prometheus/common/model" - -type Metadata interface { - Version() uint32 - NumSeries() uint64 - NumChunks() uint64 - Size() uint64 // bytes - - // timestamps - From() int64 - Through() int64 - - // series - FromFingerprint() model.Fingerprint - ThroughFingerprint() model.Fingerprint -} - -type Iterator[K any, V any] interface { - Next() bool - Err() error - At() V - Seek(K) Iterator[K, V] -} - -type Block interface { - SeriesIterator() Iterator[model.Fingerprint, []byte] -} From 8870a90540a6b317f74b4093662363bb2cd6907c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 15:46:17 +0200 Subject: [PATCH 5/6] fixup! Remove n-gram length and n-gram skip Signed-off-by: Christian Haudum --- docs/sources/shared/configuration.md | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d50669016356c..d17d6ed5591cb 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3713,20 +3713,6 @@ shard_streams: # CLI flag: -bloom-build.split-keyspace-by [bloom_split_series_keyspace_by: | default = 256] -# Experimental. Length of the n-grams created when computing blooms from log -# lines. -# CLI flag: -bloom-build.ngram-length -[bloom_ngram_length: | default = 4] - -# Experimental. Skip factor for the n-grams created when computing blooms from -# log lines. -# CLI flag: -bloom-build.ngram-skip -[bloom_ngram_skip: | default = 1] - -# Experimental. Scalable Bloom Filter desired false-positive rate. -# CLI flag: -bloom-build.false-positive-rate -[bloom_false_positive_rate: | default = 0.01] - # Experimental. Compression algorithm for bloom block pages. # CLI flag: -bloom-build.block-encoding [bloom_block_encoding: | default = "none"] From 9b8bb26f548060daed9ba841583d3c7eea00a9df Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 19 Sep 2024 16:30:01 +0200 Subject: [PATCH 6/6] Remove block backwards compatibility Signed-off-by: Christian Haudum --- pkg/storage/bloom/v1/schema.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/pkg/storage/bloom/v1/schema.go b/pkg/storage/bloom/v1/schema.go index ae2f8fb6a251d..954c96f757d6c 100644 --- a/pkg/storage/bloom/v1/schema.go +++ b/pkg/storage/bloom/v1/schema.go @@ -63,8 +63,8 @@ func (s Schema) Version() Version { // byte length func (s Schema) Len() int { - // magic number + version + encoding + ngram length + ngram skip - return 4 + 1 + 1 + 8 + 8 + // magic number + version + encoding + return 4 + 1 + 1 } func (s *Schema) DecompressorPool() compression.ReaderPool { @@ -80,9 +80,6 @@ func (s *Schema) Encode(enc *encoding.Encbuf) { enc.PutBE32(magicNumber) enc.PutByte(byte(s.version)) enc.PutByte(byte(s.encoding)) - // kept to keep compatibility - enc.PutBE64(0) // previously n-gram length - enc.PutBE64(0) // previously n-gram skip } @@ -113,9 +110,5 @@ func (s *Schema) Decode(dec *encoding.Decbuf) error { return errors.Wrap(err, "parsing encoding") } - // kept to keep compatibility - _ = dec.Be64() // previously n-gram length - _ = dec.Be64() // previously n-gram skip - return dec.Err() }