diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index e79a2503176fc..d5dd9b43bd146 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2641,21 +2641,22 @@ ring: # CLI flag: -bloom-compactor.enabled [enabled: | default = false] -# Directory where files can be downloaded for compaction. -# CLI flag: -bloom-compactor.working-directory -[working_directory: | default = ""] - # Interval at which to re-run the compaction operation. # CLI flag: -bloom-compactor.compaction-interval [compaction_interval: | default = 10m] -# Minimum age of a table before it is considered for compaction. -# CLI flag: -bloom-compactor.min-compaction-age -[min_compaction_age: | default = 24h] - -# Maximum age of a table before it is considered for compaction. -# CLI flag: -bloom-compactor.max-compaction-age -[max_compaction_age: | default = 168h] +# How many index periods (days) to wait before compacting a table. This can be +# used to lower cost by not re-writing data to object storage too frequently +# since recent data changes more often. +# CLI flag: -bloom-compactor.min-table-compaction-period +[min_table_compaction_period: | default = 1] + +# How many index periods (days) to wait before compacting a table. This can be +# used to lower cost by not trying to compact older data which doesn't change. +# This can be optimized by aligning it with the maximum +# `reject_old_samples_max_age` setting of any tenant. +# CLI flag: -bloom-compactor.max-table-compaction-period +[max_table_compaction_period: | default = 7] # Number of workers to run in parallel for compaction. # CLI flag: -bloom-compactor.worker-parallelism diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 831da46f2cb99..7e978b84eb326 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -84,7 +84,6 @@ bloom_gateway: bloom_compactor: enabled: false - working_directory: {{.dataPath}}/bloom-compactor compactor: working_directory: {{.dataPath}}/compactor diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 8a3e7c6266c1d..5cece24172526 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -205,9 +205,19 @@ func (c *Compactor) runOne(ctx context.Context) error { } func (c *Compactor) tables(ts time.Time) *dayRangeIterator { - from := model.TimeFromUnixNano(ts.Add(-c.cfg.MaxCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)) - through := model.TimeFromUnixNano(ts.Add(-c.cfg.MinCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)) - return newDayRangeIterator(DayTable(from), DayTable(through)) + // adjust the minimum by one to make it inclusive, which is more intuitive + // for a configuration variable + adjustedMin := min(c.cfg.MinTableCompactionPeriod - 1) + minCompactionPeriod := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod + maxCompactionPeriod := time.Duration(c.cfg.MaxTableCompactionPeriod) * config.ObjectStorageIndexRequiredPeriod + + from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) + through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod) + + fromDay := DayTable(model.TimeFromUnixNano(from)) + throughDay := DayTable(model.TimeFromUnixNano(through)) + return newDayRangeIterator(fromDay, throughDay) + } func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error { diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 37aac3310829a..dd821d81c906b 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -20,15 +20,14 @@ type Config struct { // section and the ingester configuration by default). Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."` // Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters - Enabled bool `yaml:"enabled"` - WorkingDirectory string `yaml:"working_directory"` - CompactionInterval time.Duration `yaml:"compaction_interval"` - MinCompactionAge time.Duration `yaml:"min_compaction_age"` - MaxCompactionAge time.Duration `yaml:"max_compaction_age"` - WorkerParallelism int `yaml:"worker_parallelism"` - RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"` - RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` - CompactionRetries int `yaml:"compaction_retries"` + Enabled bool `yaml:"enabled"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + MinTableCompactionPeriod int `yaml:"min_table_compaction_period"` + MaxTableCompactionPeriod int `yaml:"max_table_compaction_period"` + WorkerParallelism int `yaml:"worker_parallelism"` + RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"` + RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` + CompactionRetries int `yaml:"compaction_retries"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } @@ -37,23 +36,29 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f) f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.") - f.StringVar(&cfg.WorkingDirectory, "bloom-compactor.working-directory", "", "Directory where files can be downloaded for compaction.") f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.") - f.DurationVar(&cfg.MinCompactionAge, "bloom-compactor.min-compaction-age", 24*time.Hour, "Minimum age of a table before it is considered for compaction.") + f.IntVar(&cfg.MinTableCompactionPeriod, "bloom-compactor.min-table-compaction-period", 1, "How many index periods (days) to wait before compacting a table. This can be used to lower cost by not re-writing data to object storage too frequently since recent data changes more often.") // TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting, // but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by // iterating the table periods in object storage and looking for tenants within that period. // In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also // dynamically reloaded. // I'm doing it the simple way for now. - f.DurationVar(&cfg.MaxCompactionAge, "bloom-compactor.max-compaction-age", 7*24*time.Hour, "Maximum age of a table before it is considered for compaction.") + f.IntVar(&cfg.MaxTableCompactionPeriod, "bloom-compactor.max-table-compaction-period", 7, "How many index periods (days) to wait before compacting a table. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.") f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.") f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") } +func (cfg *Config) Validate() error { + if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod { + return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age") + } + return nil +} + type Limits interface { downloads.Limits BloomCompactorShardSize(tenantID string) int diff --git a/pkg/bloomcompactor/controller.go b/pkg/bloomcompactor/controller.go index cf6fff090f0ae..38831ef932e69 100644 --- a/pkg/bloomcompactor/controller.go +++ b/pkg/bloomcompactor/controller.go @@ -179,13 +179,20 @@ func (s *SimpleBloomController) buildBlocks( closePreExistingBlocks() return errors.Wrap(err, "failed to get client") } - for newBlocks.Next() { + + for newBlocks.Next() && newBlocks.Err() == nil { blockCt++ blk := newBlocks.At() + built, err := bloomshipper.BlockFrom(tenant, table.String(), blk) + if err != nil { + level.Error(logger).Log("msg", "failed to build block", "err", err) + return errors.Wrap(err, "failed to build block") + } + if err := client.PutBlock( ctx, - bloomshipper.BlockFrom(tenant, table.String(), blk), + built, ); err != nil { level.Error(logger).Log("msg", "failed to write block", "err", err) closePreExistingBlocks() diff --git a/pkg/bloomcompactor/spec.go b/pkg/bloomcompactor/spec.go index 70ea71c4e605f..d9d9c68947a73 100644 --- a/pkg/bloomcompactor/spec.go +++ b/pkg/bloomcompactor/spec.go @@ -275,7 +275,7 @@ func NewStoreChunkLoader(fetcherProvider fetcherProvider, metrics *Metrics) *Sto } func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) { - // NB(owen-d): This is probalby unnecessary as we should only have one fetcher + // NB(owen-d): This is probably unnecessary as we should only have one fetcher // because we'll only be working on a single index period at a time, but this should protect // us in the case of refactoring/changing this and likely isn't a perf bottleneck. chksByFetcher := make(map[chunkFetcher][]chunk.Chunk) @@ -338,9 +338,7 @@ func newBatchedLoader(ctx context.Context, work []chunkWork, batchSize int, metr func (b *batchedLoader) Next() bool { if len(b.batch) > 0 { - b.cur, b.err = b.format(b.batch[0]) - b.batch = b.batch[1:] - return b.err == nil + return b.prepNext(false) } if len(b.work) == 0 { @@ -357,7 +355,24 @@ func (b *batchedLoader) Next() bool { b.work = b.work[1:] } + if len(toFetch) == 0 { + return false + } + b.batch, b.err = next.fetcher.FetchChunks(b.ctx, toFetch) + if b.err != nil { + return false + } + + return b.prepNext(true) +} + +func (b *batchedLoader) prepNext(checkLen bool) bool { + if checkLen && len(b.batch) == 0 { + return false + } + b.cur, b.err = b.format(b.batch[0]) + b.batch = b.batch[1:] return b.err == nil } diff --git a/pkg/bloomcompactor/tsdb.go b/pkg/bloomcompactor/tsdb.go index be45d293f6286..e6fd92961c46c 100644 --- a/pkg/bloomcompactor/tsdb.go +++ b/pkg/bloomcompactor/tsdb.go @@ -12,6 +12,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/chunkenc" baseStore "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/config" @@ -49,12 +50,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore { } func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) { - _, users, err := b.storage.ListFiles(ctx, table.String(), false) + _, users, err := b.storage.ListFiles(ctx, table.String(), true) // bypass cache for ease of testing return users, err } func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) { - indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, false) + indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, true) // bypass cache for ease of testing if err != nil { return nil, errors.Wrap(err, "failed to list user files") } @@ -84,16 +85,25 @@ func (b *BloomTSDBStore) LoadTSDB( id tsdb.Identifier, bounds v1.FingerprintBounds, ) (v1.CloseableIterator[*v1.Series], error) { - data, err := b.storage.GetUserFile(ctx, table.String(), tenant, id.Name()) + withCompression := id.Name() + gzipExtension + + data, err := b.storage.GetUserFile(ctx, table.String(), tenant, withCompression) if err != nil { return nil, errors.Wrap(err, "failed to get file") } + defer data.Close() + + decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP) + decompressor, err := decompressorPool.GetReader(data) + if err != nil { + return nil, errors.Wrap(err, "failed to get decompressor") + } + defer decompressorPool.PutReader(decompressor) - buf, err := io.ReadAll(data) + buf, err := io.ReadAll(decompressor) if err != nil { return nil, errors.Wrap(err, "failed to read file") } - _ = data.Close() reader, err := index.NewReader(index.RealByteSlice(buf)) if err != nil { @@ -226,7 +236,8 @@ func NewTSDBStores( if err != nil { return nil, errors.Wrap(err, "failed to create object client") } - res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix)) + prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix) + res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, prefix)) } } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index a83f8ba43394f..75401decb8fc0 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -248,6 +248,9 @@ func (c *Config) Validate() error { if err := c.QueryRange.Validate(); err != nil { return errors.Wrap(err, "invalid query_range config") } + if err := c.BloomCompactor.Validate(); err != nil { + return errors.Wrap(err, "invalid bloom_compactor config") + } if err := ValidateConfigCompatibility(*c); err != nil { return err @@ -648,7 +651,7 @@ func (t *Loki) setupModuleManager() error { Write: {Ingester, Distributor}, Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor}, - All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor}, + All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor, BloomCompactor}, } if t.Cfg.Querier.PerRequestLimitsEnabled { diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 23e59e711f29b..111d313956881 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -691,7 +691,7 @@ func (t *Loki) updateConfigForShipperStore() { t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval) - case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.isModuleActive(BloomCompactor): + case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.Cfg.isModuleEnabled(BloomCompactor): // We do not want query to do any updates to index t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index 09cc5fa4866e7..84bc71a6b203c 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -32,6 +32,10 @@ func NewBlock(reader BlockReader) *Block { } } +func (b *Block) Reader() BlockReader { + return b.reader +} + func (b *Block) LoadHeaders() error { // TODO(owen-d): better control over when to decode if !b.initialized { diff --git a/pkg/storage/bloom/v1/bloom_tokenizer.go b/pkg/storage/bloom/v1/bloom_tokenizer.go index 7dd0d8ae44974..59bb2644f87e8 100644 --- a/pkg/storage/bloom/v1/bloom_tokenizer.go +++ b/pkg/storage/bloom/v1/bloom_tokenizer.go @@ -94,7 +94,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW var tokenBuf []byte var prefixLn int - for chks.Err() == nil && chks.Next() { + for chks.Next() && chks.Err() == nil { chk := chks.At() itr := chk.Itr tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N, chk.Ref, tokenBuf) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 2e31106548d1a..80eba70d18cdb 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -149,7 +149,20 @@ type Block struct { Data io.ReadSeekCloser } -func BlockFrom(tenant, table string, blk *v1.Block) Block { +// CloseableReadSeekerAdapter is a wrapper around io.ReadSeeker to make it io.Closer +// if it doesn't already implement it. +type ClosableReadSeekerAdapter struct { + io.ReadSeeker +} + +func (c ClosableReadSeekerAdapter) Close() error { + if closer, ok := c.ReadSeeker.(io.Closer); ok { + return closer.Close() + } + return nil +} + +func BlockFrom(tenant, table string, blk *v1.Block) (Block, error) { md, _ := blk.Metadata() ref := Ref{ TenantID: tenant, @@ -159,9 +172,21 @@ func BlockFrom(tenant, table string, blk *v1.Block) Block { EndTimestamp: md.Series.ThroughTs, Checksum: md.Checksum, } + + // TODO(owen-d): pool + buf := bytes.NewBuffer(nil) + err := v1.TarGz(buf, blk.Reader()) + + if err != nil { + return Block{}, errors.Wrap(err, "archiving+compressing block") + } + + reader := bytes.NewReader(buf.Bytes()) + return Block{ BlockRef: BlockRef{Ref: ref}, - } + Data: ClosableReadSeekerAdapter{reader}, + }, nil } type BlockClient interface {