Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom/running #11918

Merged
merged 18 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2641,21 +2641,22 @@ ring:
# CLI flag: -bloom-compactor.enabled
[enabled: <boolean> | default = false]

# Directory where files can be downloaded for compaction.
# CLI flag: -bloom-compactor.working-directory
[working_directory: <string> | default = ""]

# Interval at which to re-run the compaction operation.
# CLI flag: -bloom-compactor.compaction-interval
[compaction_interval: <duration> | default = 10m]

# Minimum age of a table before it is considered for compaction.
# CLI flag: -bloom-compactor.min-compaction-age
[min_compaction_age: <duration> | default = 24h]

# Maximum age of a table before it is considered for compaction.
# CLI flag: -bloom-compactor.max-compaction-age
[max_compaction_age: <duration> | default = 168h]
# How many index periods (days) to wait before compacting a table. This can be
# used to lower cost by not re-writing data to object storage too frequently
# since recent data changes more often.
# CLI flag: -bloom-compactor.min-table-compaction-period
[min_table_compaction_period: <int> | default = 1]

# How many index periods (days) to wait before compacting a table. This can be
# used to lower cost by not trying to compact older data which doesn't change.
# This can be optimized by aligning it with the maximum
# `reject_old_samples_max_age` setting of any tenant.
# CLI flag: -bloom-compactor.max-table-compaction-period
[max_table_compaction_period: <int> | default = 7]

# Number of workers to run in parallel for compaction.
# CLI flag: -bloom-compactor.worker-parallelism
Expand Down
1 change: 0 additions & 1 deletion integration/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ bloom_gateway:

bloom_compactor:
enabled: false
working_directory: {{.dataPath}}/bloom-compactor

compactor:
working_directory: {{.dataPath}}/compactor
Expand Down
16 changes: 13 additions & 3 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,19 @@ func (c *Compactor) runOne(ctx context.Context) error {
}

func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
from := model.TimeFromUnixNano(ts.Add(-c.cfg.MaxCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod))
through := model.TimeFromUnixNano(ts.Add(-c.cfg.MinCompactionAge).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod))
return newDayRangeIterator(DayTable(from), DayTable(through))
// adjust the minimum by one to make it inclusive, which is more intuitive
// for a configuration variable
adjustedMin := min(c.cfg.MinTableCompactionPeriod - 1)
minCompactionPeriod := time.Duration(adjustedMin) * config.ObjectStorageIndexRequiredPeriod
maxCompactionPeriod := time.Duration(c.cfg.MaxTableCompactionPeriod) * config.ObjectStorageIndexRequiredPeriod

from := ts.Add(-maxCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)
through := ts.Add(-minCompactionPeriod).UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod) * int64(config.ObjectStorageIndexRequiredPeriod)

fromDay := DayTable(model.TimeFromUnixNano(from))
throughDay := DayTable(model.TimeFromUnixNano(through))
return newDayRangeIterator(fromDay, throughDay)

}

func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
Expand Down
29 changes: 17 additions & 12 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ type Config struct {
// section and the ingester configuration by default).
Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."`
// Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters
Enabled bool `yaml:"enabled"`
WorkingDirectory string `yaml:"working_directory"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinCompactionAge time.Duration `yaml:"min_compaction_age"`
MaxCompactionAge time.Duration `yaml:"max_compaction_age"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`
Enabled bool `yaml:"enabled"`
CompactionInterval time.Duration `yaml:"compaction_interval"`
MinTableCompactionPeriod int `yaml:"min_table_compaction_period"`
MaxTableCompactionPeriod int `yaml:"max_table_compaction_period"`
WorkerParallelism int `yaml:"worker_parallelism"`
RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"`
RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"`
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`
}
Expand All @@ -37,23 +36,29 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f)
f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.")
f.StringVar(&cfg.WorkingDirectory, "bloom-compactor.working-directory", "", "Directory where files can be downloaded for compaction.")
f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.")
f.IntVar(&cfg.WorkerParallelism, "bloom-compactor.worker-parallelism", 1, "Number of workers to run in parallel for compaction.")
f.DurationVar(&cfg.MinCompactionAge, "bloom-compactor.min-compaction-age", 24*time.Hour, "Minimum age of a table before it is considered for compaction.")
f.IntVar(&cfg.MinTableCompactionPeriod, "bloom-compactor.min-table-compaction-period", 1, "How many index periods (days) to wait before compacting a table. This can be used to lower cost by not re-writing data to object storage too frequently since recent data changes more often.")
// TODO(owen-d): ideally we'd set this per tenant based on their `reject_old_samples_max_age` setting,
// but due to how we need to discover tenants, we can't do that yet. Tenant+Period discovery is done by
// iterating the table periods in object storage and looking for tenants within that period.
// In order to have this done dynamically, we'd need to account for tenant specific overrides, which are also
// dynamically reloaded.
// I'm doing it the simple way for now.
f.DurationVar(&cfg.MaxCompactionAge, "bloom-compactor.max-compaction-age", 7*24*time.Hour, "Maximum age of a table before it is considered for compaction.")
f.IntVar(&cfg.MaxTableCompactionPeriod, "bloom-compactor.max-table-compaction-period", 7, "How many index periods (days) to wait before compacting a table. This can be used to lower cost by not trying to compact older data which doesn't change. This can be optimized by aligning it with the maximum `reject_old_samples_max_age` setting of any tenant.")
f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.")
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
}

func (cfg *Config) Validate() error {
if cfg.MinTableCompactionPeriod > cfg.MaxTableCompactionPeriod {
return fmt.Errorf("min_compaction_age must be less than or equal to max_compaction_age")
}
return nil
}

type Limits interface {
downloads.Limits
BloomCompactorShardSize(tenantID string) int
Expand Down
11 changes: 9 additions & 2 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,20 @@ func (s *SimpleBloomController) buildBlocks(
closePreExistingBlocks()
return errors.Wrap(err, "failed to get client")
}
for newBlocks.Next() {

for newBlocks.Next() && newBlocks.Err() == nil {
blockCt++
blk := newBlocks.At()

built, err := bloomshipper.BlockFrom(tenant, table.String(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
return errors.Wrap(err, "failed to build block")
}

if err := client.PutBlock(
ctx,
bloomshipper.BlockFrom(tenant, table.String(), blk),
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
closePreExistingBlocks()
Expand Down
23 changes: 19 additions & 4 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func NewStoreChunkLoader(fetcherProvider fetcherProvider, metrics *Metrics) *Sto
}

func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.Series) (*ChunkItersByFingerprint, error) {
// NB(owen-d): This is probalby unnecessary as we should only have one fetcher
// NB(owen-d): This is probably unnecessary as we should only have one fetcher
// because we'll only be working on a single index period at a time, but this should protect
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
chksByFetcher := make(map[chunkFetcher][]chunk.Chunk)
Expand Down Expand Up @@ -338,9 +338,7 @@ func newBatchedLoader(ctx context.Context, work []chunkWork, batchSize int, metr

func (b *batchedLoader) Next() bool {
if len(b.batch) > 0 {
b.cur, b.err = b.format(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
return b.prepNext(false)
}

if len(b.work) == 0 {
Expand All @@ -357,7 +355,24 @@ func (b *batchedLoader) Next() bool {
b.work = b.work[1:]
}

if len(toFetch) == 0 {
return false
}

b.batch, b.err = next.fetcher.FetchChunks(b.ctx, toFetch)
if b.err != nil {
return false
}

return b.prepNext(true)
}

func (b *batchedLoader) prepNext(checkLen bool) bool {
if checkLen && len(b.batch) == 0 {
return false
}
b.cur, b.err = b.format(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
}

Expand Down
23 changes: 17 additions & 6 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/chunkenc"
baseStore "github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/config"
Expand Down Expand Up @@ -49,12 +50,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}

func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table DayTable) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.String(), false)
_, users, err := b.storage.ListFiles(ctx, table.String(), true) // bypass cache for ease of testing
return users, err
}

func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table DayTable, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, false)
indices, err := b.storage.ListUserFiles(ctx, table.String(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
Expand Down Expand Up @@ -84,16 +85,25 @@ func (b *BloomTSDBStore) LoadTSDB(
id tsdb.Identifier,
bounds v1.FingerprintBounds,
) (v1.CloseableIterator[*v1.Series], error) {
data, err := b.storage.GetUserFile(ctx, table.String(), tenant, id.Name())
withCompression := id.Name() + gzipExtension

data, err := b.storage.GetUserFile(ctx, table.String(), tenant, withCompression)
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}
defer data.Close()

decompressorPool := chunkenc.GetReaderPool(chunkenc.EncGZIP)
decompressor, err := decompressorPool.GetReader(data)
if err != nil {
return nil, errors.Wrap(err, "failed to get decompressor")
}
defer decompressorPool.PutReader(decompressor)

buf, err := io.ReadAll(data)
buf, err := io.ReadAll(decompressor)
if err != nil {
return nil, errors.Wrap(err, "failed to read file")
}
_ = data.Close()

reader, err := index.NewReader(index.RealByteSlice(buf))
if err != nil {
Expand Down Expand Up @@ -226,7 +236,8 @@ func NewTSDBStores(
if err != nil {
return nil, errors.Wrap(err, "failed to create object client")
}
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, cfg.IndexTables.PathPrefix))
prefix := path.Join(cfg.IndexTables.PathPrefix, cfg.IndexTables.Prefix)
res.stores[i] = NewBloomTSDBStore(storage.NewIndexStorageClient(c, prefix))
}
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,9 @@ func (c *Config) Validate() error {
if err := c.QueryRange.Validate(); err != nil {
return errors.Wrap(err, "invalid query_range config")
}
if err := c.BloomCompactor.Validate(); err != nil {
return errors.Wrap(err, "invalid bloom_compactor config")
}

if err := ValidateConfigCompatibility(*c); err != nil {
return err
Expand Down Expand Up @@ -648,7 +651,7 @@ func (t *Loki) setupModuleManager() error {
Write: {Ingester, Distributor},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway, BloomCompactor},

All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, Distributor, Ruler, Compactor, BloomCompactor},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up for discussion, but I don't think that we want to include the bloom compactor in the all target.
For testing, it comes handy, I understand.

Copy link
Contributor

@salvacorts salvacorts Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think users of the monolith mode will benefit from building blooms significantly, right? In addition to that, it will increase resource usage and operational burden. - So I think we should remove it, or add a feature flag for that which defaults to false (e.g. enable_bloom_compactor_all_target).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair points all around, but we can leave it to enable better testing then remove it prior to release.

}

if t.Cfg.Querier.PerRequestLimitsEnabled {
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (t *Loki) updateConfigForShipperStore() {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)

case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.isModuleActive(BloomCompactor):
case t.Cfg.isModuleEnabled(Querier), t.Cfg.isModuleEnabled(Ruler), t.Cfg.isModuleEnabled(Read), t.Cfg.isModuleEnabled(Backend), t.isModuleActive(IndexGateway), t.Cfg.isModuleEnabled(BloomCompactor):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/bloom/v1/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func NewBlock(reader BlockReader) *Block {
}
}

func (b *Block) Reader() BlockReader {
return b.reader
}

func (b *Block) LoadHeaders() error {
// TODO(owen-d): better control over when to decode
if !b.initialized {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (bt *BloomTokenizer) Populate(swb *SeriesWithBloom, chks Iterator[ChunkRefW
var tokenBuf []byte
var prefixLn int

for chks.Err() == nil && chks.Next() {
for chks.Next() && chks.Err() == nil {
chk := chks.At()
itr := chk.Itr
tokenBuf, prefixLn = prefixedToken(bt.lineTokenizer.N, chk.Ref, tokenBuf)
Expand Down
29 changes: 27 additions & 2 deletions pkg/storage/stores/shipper/bloomshipper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,20 @@ type Block struct {
Data io.ReadSeekCloser
}

func BlockFrom(tenant, table string, blk *v1.Block) Block {
// CloseableReadSeekerAdapter is a wrapper around io.ReadSeeker to make it io.Closer
// if it doesn't already implement it.
type ClosableReadSeekerAdapter struct {
io.ReadSeeker
}

func (c ClosableReadSeekerAdapter) Close() error {
if closer, ok := c.ReadSeeker.(io.Closer); ok {
return closer.Close()
}
return nil
}

func BlockFrom(tenant, table string, blk *v1.Block) (Block, error) {
md, _ := blk.Metadata()
ref := Ref{
TenantID: tenant,
Expand All @@ -159,9 +172,21 @@ func BlockFrom(tenant, table string, blk *v1.Block) Block {
EndTimestamp: md.Series.ThroughTs,
Checksum: md.Checksum,
}

// TODO(owen-d): pool
buf := bytes.NewBuffer(nil)
err := v1.TarGz(buf, blk.Reader())

if err != nil {
return Block{}, errors.Wrap(err, "archiving+compressing block")
}

reader := bytes.NewReader(buf.Bytes())

return Block{
BlockRef: BlockRef{Ref: ref},
}
Data: ClosableReadSeekerAdapter{reader},
}, nil
}

type BlockClient interface {
Expand Down
Loading