Skip to content

Commit

Permalink
Configurable NGramLen, NGramSkip and MaxBlockSize
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts committed Feb 7, 2024
1 parent 37b6ba8 commit 53c9865
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 26 deletions.
5 changes: 3 additions & 2 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type Limits interface {
BloomCompactorChunksBatchSize(userID string) int
BloomCompactorMaxTableAge(tenantID string) time.Duration
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
BloomNGramSkip(tenantID string) int
BloomNGramLength(tenantID string) uint64
BloomNGramSkip(tenantID string) uint64
BloomFalsePositiveRate(tenantID string) float64
BloomCompactorMaxBlockSize(tenantID string) uint64
}
13 changes: 12 additions & 1 deletion pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
)

type SimpleBloomController struct {
tenant string
limits Limits
ownershipRange v1.FingerprintBounds // ownership range of this controller
tsdbStore TSDBStore
metaStore MetaStore
Expand All @@ -28,6 +30,8 @@ type SimpleBloomController struct {
}

func NewSimpleBloomController(
tenant string,
limits Limits,
ownershipRange v1.FingerprintBounds,
tsdbStore TSDBStore,
metaStore MetaStore,
Expand All @@ -38,6 +42,8 @@ func NewSimpleBloomController(
logger log.Logger,
) *SimpleBloomController {
return &SimpleBloomController{
tenant: tenant,
limits: limits,
ownershipRange: ownershipRange,
tsdbStore: tsdbStore,
metaStore: metaStore,
Expand Down Expand Up @@ -95,6 +101,11 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
return errors.Wrap(err, "failed to create plan")
}

nGramSize := s.limits.BloomNGramLength(s.tenant)
nGramSkip := s.limits.BloomNGramSkip(s.tenant)
maxBlockSize := s.limits.BloomCompactorMaxBlockSize(s.tenant)
blockOpts := v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize)

// 5. Generate Blooms
// Now that we have the gaps, we will generate a bloom block for each gap.
// We can accelerate this by using existing blocks which may already contain
Expand Down Expand Up @@ -122,7 +133,7 @@ func (s *SimpleBloomController) do(ctx context.Context) error {
}

gen := NewSimpleBloomGenerator(
v1.DefaultBlockOptions, // TODO(salvacorts) make block options configurable
blockOpts,
seriesItr,
s.chunkLoader,
preExistingBlocks,
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (bq *BlockQuerier) CheckChunksForSeries(fp model.Fingerprint, chks ChunkRef
outer:
for _, chk := range inBlooms {
// Get buf to concatenate the chunk and search token
tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf)
tokenBuf, prefixLen = prefixedToken(int(schema.NGramLen()), chk, tokenBuf)
for _, search := range searches {
tokenBuf = append(tokenBuf[:prefixLen], search...)

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ const eightBits = 8
// 1) The token slices generated must not be mutated externally
// 2) The token slice must not be used after the next call to `Tokens()` as it will repopulate the slice.
// 2) This is not thread safe.
func NewBloomTokenizer(nGramLen, nGramSkip int, metrics *Metrics) *BloomTokenizer {
func NewBloomTokenizer(nGramLen, nGramSkip uint64, metrics *Metrics) *BloomTokenizer {
// TODO(chaudum): Replace logger
level.Info(util_log.Logger).Log("msg", "create new bloom tokenizer", "ngram length", nGramLen, "ngram skip", nGramSkip)
return &BloomTokenizer{
metrics: metrics,
cache: make(map[string]interface{}, cacheSize),
lineTokenizer: NewNGramTokenizer(nGramLen, nGramSkip),
lineTokenizer: NewNGramTokenizer(int(nGramLen), int(nGramSkip)),
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (fq *FusedQuerier) Run() error {
chunkLoop:
for _, chk := range inBlooms {
// Get buf to concatenate the chunk and search token
tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf)
tokenBuf, prefixLen = prefixedToken(int(schema.NGramLen()), chk, tokenBuf)
for _, search := range input.Searches {
tokenBuf = append(tokenBuf[:prefixLen], search...)

Expand Down
8 changes: 4 additions & 4 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ func (s Schema) Compatible(other Schema) bool {
return s == other
}

func (s Schema) NGramLen() int {
return int(s.nGramLength)
func (s Schema) NGramLen() uint64 {
return s.nGramLength
}

func (s Schema) NGramSkip() int {
return int(s.nGramSkip)
func (s Schema) NGramSkip() uint64 {
return s.nGramSkip
}

// byte length
Expand Down
38 changes: 23 additions & 15 deletions pkg/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (

defaultMaxStructuredMetadataSize = "64kb"
defaultMaxStructuredMetadataCount = 128
defaultBloomCompactorMaxBlockSize = "200MB"
)

// Limits describe all the limits for users; can be used to describe global default
Expand Down Expand Up @@ -187,15 +188,16 @@ type Limits struct {
BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"`
BloomGatewayEnabled bool `yaml:"bloom_gateway_enable_filtering" json:"bloom_gateway_enable_filtering"`

BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"`
BloomNGramLength int `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip int `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"`
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"`
BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"`
BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"`
BloomCompactorEnabled bool `yaml:"bloom_compactor_enable_compaction" json:"bloom_compactor_enable_compaction"`
BloomCompactorChunksBatchSize int `yaml:"bloom_compactor_chunks_batch_size" json:"bloom_compactor_chunks_batch_size"`
BloomNGramLength uint64 `yaml:"bloom_ngram_length" json:"bloom_ngram_length"`
BloomNGramSkip uint64 `yaml:"bloom_ngram_skip" json:"bloom_ngram_skip"`
BloomFalsePositiveRate float64 `yaml:"bloom_false_positive_rate" json:"bloom_false_positive_rate"`
BloomGatewayBlocksDownloadingParallelism int `yaml:"bloom_gateway_blocks_downloading_parallelism" json:"bloom_gateway_blocks_downloading_parallelism"`
BloomGatewayCacheKeyInterval time.Duration `yaml:"bloom_gateway_cache_key_interval" json:"bloom_gateway_cache_key_interval"`
BloomCompactorMaxBlockSize flagext.ByteSize `yaml:"bloom_compactor_max_block_size" json:"bloom_compactor_max_block_size"`

AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."`
MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."`
Expand Down Expand Up @@ -328,11 +330,13 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.")
f.BoolVar(&l.BloomCompactorEnabled, "bloom-compactor.enable-compaction", false, "Whether to compact chunks into bloom filters.")
f.IntVar(&l.BloomCompactorChunksBatchSize, "bloom-compactor.chunks-batch-size", 100, "The batch size of the chunks the bloom-compactor downloads at once.")
f.IntVar(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.IntVar(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Uint64Var(&l.BloomNGramLength, "bloom-compactor.ngram-length", 4, "Length of the n-grams created when computing blooms from log lines.")
f.Uint64Var(&l.BloomNGramSkip, "bloom-compactor.ngram-skip", 0, "Skip factor for the n-grams created when computing blooms from log lines.")
f.Float64Var(&l.BloomFalsePositiveRate, "bloom-compactor.false-positive-rate", 0.01, "Scalable Bloom Filter desired false-positive rate.")
f.IntVar(&l.BloomGatewayBlocksDownloadingParallelism, "bloom-gateway.blocks-downloading-parallelism", 50, "Maximum number of blocks will be downloaded in parallel by the Bloom Gateway.")
f.DurationVar(&l.BloomGatewayCacheKeyInterval, "bloom-gateway.cache-key-interval", 15*time.Minute, "Interval for computing the cache key in the Bloom Gateway.")
_ = l.BloomCompactorMaxBlockSize.Set(defaultBloomCompactorMaxBlockSize)
f.Var(&l.BloomCompactorMaxBlockSize, "bloom-compactor.max-block-size", "The maximum bloom block size. A value of 0 sets an unlimited size. Default is 200MB. The actual block size might exceed this limit since blooms will be added to blocks until the block exceeds the maximum block size.")

l.ShardStreams = &shardstreams.Config{}
l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f)
Expand Down Expand Up @@ -874,12 +878,16 @@ func (o *Overrides) BloomCompactorEnabled(userID string) bool {
return o.getOverridesForUser(userID).BloomCompactorEnabled
}

func (o *Overrides) BloomNGramLength(userID string) int {
return o.getOverridesForUser(userID).BloomNGramLength
func (o *Overrides) BloomNGramLength(userID string) uint64 {
return uint64(o.getOverridesForUser(userID).BloomNGramLength)
}

func (o *Overrides) BloomNGramSkip(userID string) int {
return o.getOverridesForUser(userID).BloomNGramSkip
func (o *Overrides) BloomNGramSkip(userID string) uint64 {
return uint64(o.getOverridesForUser(userID).BloomNGramSkip)
}

func (o *Overrides) BloomCompactorMaxBlockSize(userID string) uint64 {
return uint64(o.getOverridesForUser(userID).BloomCompactorMaxBlockSize.Val())
}

func (o *Overrides) BloomFalsePositiveRate(userID string) float64 {
Expand Down

0 comments on commit 53c9865

Please sign in to comment.