diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index ab9ab42c18703..4c99c3b4c8d1a 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2351,7 +2351,8 @@ tsdb_shipper: [ingesterdbretainperiod: ] -# Configures Bloom Shipper. +# Configures the bloom shipper component, which contains the store abstraction +# to fetch bloom filters from and put them to object storage. bloom_shipper: # Working directory to store downloaded bloom blocks. Supports multiple # directories, separated by comma. @@ -2363,15 +2364,9 @@ bloom_shipper: # CLI flag: -bloom.max-query-page-size [max_query_page_size: | default = 64MiB] - blocks_downloading_queue: - # The count of parallel workers that download Bloom Blocks. - # CLI flag: -bloom.shipper.blocks-downloading-queue.workers-count - [workers_count: | default = 16] - - # Maximum number of task in queue per tenant per bloom-gateway. Enqueuing - # the tasks above this limit will fail an error. - # CLI flag: -bloom.shipper.blocks-downloading-queue.max_tasks_enqueued_per_tenant - [max_tasks_enqueued_per_tenant: | default = 10000] + # The amount of maximum concurrent bloom blocks downloads. + # CLI flag: -bloom.download-parallelism + [download_parallelism: | default = 16] blocks_cache: # Cache for bloom blocks. Soft limit of the cache in bytes. Exceeding this diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 5e29413a68c62..79dc7ce2809ff 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -82,8 +82,6 @@ storage_config: cache_location: {{.dataPath}}/tsdb-cache bloom_shipper: working_directory: {{.dataPath}}/bloom-shipper - blocks_downloading_queue: - workers_count: 1 bloom_gateway: enabled: false diff --git a/pkg/bloomcompactor/retention_test.go b/pkg/bloomcompactor/retention_test.go index 6c3c82c426c3e..0f880a2bd7e2a 100644 --- a/pkg/bloomcompactor/retention_test.go +++ b/pkg/bloomcompactor/retention_test.go @@ -802,10 +802,8 @@ func NewMockBloomStoreWithWorkDir(t *testing.T, workDir string) (*bloomshipper.B Directory: workDir, }, BloomShipperConfig: config.Config{ - WorkingDirectory: []string{workDir}, - BlocksDownloadingQueue: config.DownloadingQueueConfig{ - WorkersCount: 1, - }, + WorkingDirectory: []string{workDir}, + DownloadParallelism: 1, BlocksCache: config.BlocksCacheConfig{ SoftLimit: 1 << 20, HardLimit: 2 << 20, diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index f705bb5eb0919..59f37974a4ce1 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -72,10 +72,8 @@ func setupBloomStore(t *testing.T) *bloomshipper.BloomStore { } storageCfg := storage.Config{ BloomShipperConfig: bloomshipperconfig.Config{ - WorkingDirectory: []string{t.TempDir()}, - BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{ - WorkersCount: 1, - }, + WorkingDirectory: []string{t.TempDir()}, + DownloadParallelism: 1, BlocksCache: bloomshipperconfig.BlocksCacheConfig{ SoftLimit: flagext.Bytes(10 << 20), HardLimit: flagext.Bytes(20 << 20), diff --git a/pkg/loki/modules_test.go b/pkg/loki/modules_test.go index 1c8945b51a37e..4529eb7c23c88 100644 --- a/pkg/loki/modules_test.go +++ b/pkg/loki/modules_test.go @@ -367,10 +367,8 @@ func minimalWorkingConfig(t *testing.T, dir, target string, cfgTransformers ...f cfg.StorageConfig = storage.Config{ FSConfig: local.FSConfig{Directory: dir}, BloomShipperConfig: bloomshipperconfig.Config{ - WorkingDirectory: []string{filepath.Join(dir, "blooms")}, - BlocksDownloadingQueue: bloomshipperconfig.DownloadingQueueConfig{ - WorkersCount: 1, - }, + WorkingDirectory: []string{filepath.Join(dir, "blooms")}, + DownloadParallelism: 1, }, BoltDBShipperConfig: boltdb.IndexCfg{ Config: indexshipper.Config{ diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index da687c5ea9c7b..b619d978a7564 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -336,7 +336,7 @@ type Config struct { MaxChunkBatchSize int `yaml:"max_chunk_batch_size"` BoltDBShipperConfig boltdb.IndexCfg `yaml:"boltdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in the form of boltdb files. Required fields only required when boltdb-shipper is defined in config."` TSDBShipperConfig indexshipper.Config `yaml:"tsdb_shipper" doc:"description=Configures storing index in an Object Store (GCS/S3/Azure/Swift/COS/Filesystem) in a prometheus TSDB-like format. Required fields only required when TSDB is defined in config."` - BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures Bloom Shipper."` + BloomShipperConfig bloomshipperconfig.Config `yaml:"bloom_shipper" doc:"description=Configures the bloom shipper component, which contains the store abstraction to fetch bloom filters from and put them to object storage."` // Config for using AsyncStore when using async index stores like `boltdb-shipper`. // It is required for getting chunk ids of recently flushed chunks from the ingesters. diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 3aef86cabdf22..89a2f30e2dd33 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -12,21 +12,11 @@ import ( ) type Config struct { - WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"` - MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"` - BlocksDownloadingQueue DownloadingQueueConfig `yaml:"blocks_downloading_queue"` - BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` - MetasCache cache.Config `yaml:"metas_cache"` -} - -type DownloadingQueueConfig struct { - WorkersCount int `yaml:"workers_count"` - MaxTasksEnqueuedPerTenant int `yaml:"max_tasks_enqueued_per_tenant"` -} - -func (cfg *DownloadingQueueConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { - f.IntVar(&cfg.WorkersCount, prefix+"workers-count", 16, "The count of parallel workers that download Bloom Blocks.") - f.IntVar(&cfg.MaxTasksEnqueuedPerTenant, prefix+"max_tasks_enqueued_per_tenant", 10_000, "Maximum number of task in queue per tenant per bloom-gateway. Enqueuing the tasks above this limit will fail an error.") + WorkingDirectory flagext.StringSliceCSV `yaml:"working_directory"` + MaxQueryPageSize flagext.Bytes `yaml:"max_query_page_size"` + DownloadParallelism int `yaml:"download_parallelism"` + BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` + MetasCache cache.Config `yaml:"metas_cache"` } func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -34,7 +24,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.Var(&c.WorkingDirectory, prefix+"shipper.working-directory", "Working directory to store downloaded bloom blocks. Supports multiple directories, separated by comma.") _ = c.MaxQueryPageSize.Set("64MiB") // default should match the one set in pkg/storage/bloom/v1/bloom.go f.Var(&c.MaxQueryPageSize, prefix+"max-query-page-size", "Maximum size of bloom pages that should be queried. Larger pages than this limit are skipped when querying blooms to limit memory usage.") - c.BlocksDownloadingQueue.RegisterFlagsWithPrefix(prefix+"shipper.blocks-downloading-queue.", f) + f.IntVar(&c.DownloadParallelism, prefix+"download-parallelism", 16, "The amount of maximum concurrent bloom blocks downloads.") c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) } diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go index ce15d4cc2663a..83d0db9e4296b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ b/pkg/storage/stores/shipper/bloomshipper/store.go @@ -278,7 +278,7 @@ func NewBloomStore( // TODO(chaudum): Remove wrapper cfg := bloomStoreConfig{ workingDirs: storageConfig.BloomShipperConfig.WorkingDirectory, - numWorkers: storageConfig.BloomShipperConfig.BlocksDownloadingQueue.WorkersCount, + numWorkers: storageConfig.BloomShipperConfig.DownloadParallelism, maxBloomPageSize: int(storageConfig.BloomShipperConfig.MaxQueryPageSize), } diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go index 6c206161839a6..3ba7b8d2b5dee 100644 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/store_test.go @@ -58,10 +58,8 @@ func newMockBloomStoreWithWorkDir(t *testing.T, workDir, storeDir string) (*Bloo Directory: storeDir, }, BloomShipperConfig: config.Config{ - WorkingDirectory: []string{workDir}, - BlocksDownloadingQueue: config.DownloadingQueueConfig{ - WorkersCount: 1, - }, + WorkingDirectory: []string{workDir}, + DownloadParallelism: 1, BlocksCache: config.BlocksCacheConfig{ SoftLimit: 1 << 20, HardLimit: 2 << 20,