Skip to content

Commit

Permalink
chore(blooms): Clean up bloom component configuration (#12387)
Browse files Browse the repository at this point in the history
* Remove unused setting and move download parallelism configuration one level up.
* Update description of bloom_shipper config block 

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Apr 2, 2024
1 parent 36c703d commit 71602eb
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 46 deletions.
15 changes: 5 additions & 10 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2351,7 +2351,8 @@ tsdb_shipper:

[ingesterdbretainperiod: <duration>]

# Configures Bloom Shipper.
# Configures the bloom shipper component, which contains the store abstraction
# to fetch bloom filters from and put them to object storage.
bloom_shipper:
# Working directory to store downloaded bloom blocks. Supports multiple
# directories, separated by comma.
Expand All @@ -2363,15 +2364,9 @@ bloom_shipper:
# CLI flag: -bloom.max-query-page-size
[max_query_page_size: <int> | default = 64MiB]

blocks_downloading_queue:
# The count of parallel workers that download Bloom Blocks.
# CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count
[workers_count: <int> | default = 16]

# Maximum number of task in queue per tenant per bloom-gateway. Enqueuing
# the tasks above this limit will fail an error.
# CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant
[max_tasks_enqueued_per_tenant: <int> | default = 10000]
# The amount of maximum concurrent bloom blocks downloads.
# CLI flag: -bloom.download-parallelism
[download_parallelism: <int> | default = 16]

blocks_cache:
# Cache for bloom blocks. Soft limit of the cache in bytes. Exceeding this
Expand Down
2 changes: 0 additions & 2 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ storage_config:
cache_location: {{.dataPath}}/tsdb-cache
bloom_shipper:
working_directory: {{.dataPath}}/bloom-shipper
blocks_downloading_queue:
workers_count: 1
bloom_gateway:
enabled: false
Expand Down
6 changes: 2 additions & 4 deletions pkg/bloomcompactor/retention_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,10 +802,8 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B
Directory: workDir,
},
BloomShipperConfig: config.Config{
WorkingDirectory: []string{workDir},
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{workDir},
DownloadParallelism: 1,
BlocksCache: config.BlocksCacheConfig{
SoftLimit: 1 << 20,
HardLimit: 2 << 20,
Expand Down
6 changes: 2 additions & 4 deletions pkg/bloomgateway/bloomgateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore {
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: []string{t.TempDir()},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{t.TempDir()},
DownloadParallelism: 1,
BlocksCache: bloomshipperconfig.BlocksCacheConfig{
SoftLimit: flagext.Bytes(10 << 20),
HardLimit: flagext.Bytes(20 << 20),
Expand Down
6 changes: 2 additions & 4 deletions pkg/loki/modules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,10 +367,8 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f
cfg.StorageConfig = storage.Config{
FSConfig: local.FSConfig{Directory: dir},
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: []string{filepath.Join(dir, "blooms")},
BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{filepath.Join(dir, "blooms")},
DownloadParallelism: 1,
},
BoltDBShipperConfig: boltdb.IndexCfg{
Config: indexshipper.Config{
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ type Config struct {
MaxChunkBatchSize int `yaml:"max_chunk_batch_size"`
BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."`
TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."`
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures Bloom Shipper."`
BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures the bloom shipper component, which contains the store abstraction to fetch bloom filters from and put them to object storage."`

// Config for using AsyncStore when using async index stores like `boltdb-shipper`.
// It is required for getting chunk ids of recently flushed chunks from the ingesters.
Expand Down
22 changes: 6 additions & 16 deletions pkg/storage/stores/shipper/bloomshipper/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,19 @@ import (
)

type Config struct {
WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
}

type DownloadingQueueConfig struct {
WorkersCount int `yaml:"workers_count"`
MaxTasksEnqueuedPerTenant int `yaml:"max_tasks_enqueued_per_tenant"`
}

func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 16, "The count of parallel workers that download Bloom Blocks.")
f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.")
WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"`
MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"`
DownloadParallelism int `yaml:"download_parallelism"`
BlocksCache BlocksCacheConfig `yaml:"blocks_cache"`
MetasCache cache.Config `yaml:"metas_cache"`
}

func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
c.WorkingDirectory = []string{"/data/blooms"}
f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.")
_ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go
f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.")
c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f)
f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.")
c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour)
c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores/shipper/bloomshipper/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func NewBloomStore(
// TODO(chaudum): Remove wrapper
cfg := bloomStoreConfig{
workingDirs: storageConfig.BloomShipperConfig.WorkingDirectory,
numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount,
numWorkers: storageConfig.BloomShipperConfig.DownloadParallelism,
maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize),
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/stores/shipper/bloomshipper/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir, storeDir string) (*Bloo
Directory: storeDir,
},
BloomShipperConfig: config.Config{
WorkingDirectory: []string{workDir},
BlocksDownloadingQueue: config.DownloadingQueueConfig{
WorkersCount: 1,
},
WorkingDirectory: []string{workDir},
DownloadParallelism: 1,
BlocksCache: config.BlocksCacheConfig{
SoftLimit: 1 << 20,
HardLimit: 2 << 20,
Expand Down

0 comments on commit 71602eb

Please sign in to comment.