Skip to content

Commit

Permalink
feat: Blooms retention (#12258)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Mar 28, 2024
1 parent cc941fe commit 86c768c
Show file tree
Hide file tree
Showing 14 changed files with 1,573 additions and 19 deletions.
9 changes: 9 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2734,6 +2734,15 @@ ring:
# and compact as many tables.
# CLI flag: -bloom-compactor.max-compaction-parallelism
[max_compaction_parallelism: <int> | default = 1]

retention:
# Enable bloom retention.
# CLI flag: -bloom-compactor.retention.enabled
[enabled: <boolean> | default = false]

# Max lookback days for retention.
# CLI flag: -bloom-compactor.retention.max-lookback-days
[max_lookback_days: <int> | default = 365]
```
### limits_config
Expand Down
28 changes: 23 additions & 5 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type Compactor struct {

tsdbStore TSDBStore
// TODO(owen-d): ShardingStrategy
controller *SimpleBloomController
controller *SimpleBloomController
retentionManager *RetentionManager

// temporary workaround until bloomStore has implemented read/write shipper interface
bloomStore bloomshipper.Store
Expand All @@ -64,7 +65,8 @@ func New(
storeCfg storage.Config,
clientMetrics storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
sharding util_ring.TenantSharding,
ring ring.ReadRing,
ringLifeCycler *ring.BasicLifecycler,
limits Limits,
store bloomshipper.StoreWithMetrics,
logger log.Logger,
Expand All @@ -74,7 +76,7 @@ func New(
cfg: cfg,
schemaCfg: schemaCfg,
logger: logger,
sharding: sharding,
sharding: util_ring.NewTenantShuffleSharding(ring, ringLifeCycler, limits.BloomCompactorShardSize),
limits: limits,
bloomStore: store,
metrics: NewMetrics(r, store.BloomMetrics()),
Expand All @@ -100,6 +102,15 @@ func New(
c.logger,
)

c.retentionManager = NewRetentionManager(
c.cfg.RetentionConfig,
c.limits,
c.bloomStore,
newFirstTokenRetentionSharding(ring, ringLifeCycler),
c.metrics,
c.logger,
)

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c, nil
}
Expand Down Expand Up @@ -214,10 +225,17 @@ func (c *Compactor) runOne(ctx context.Context) error {
c.metrics.compactionsStarted.Inc()
start := time.Now()
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var workersErr, retentionErr error
var wg sync.WaitGroup
input := make(chan *tenantTableRange)

// Launch retention (will return instantly if retention is disabled or not owned by this compactor)
wg.Add(1)
go func() {
retentionErr = c.retentionManager.Apply(ctx)
wg.Done()
}()

tables := c.tables(time.Now())
level.Debug(c.logger).Log("msg", "loaded tables", "tables", tables.TotalDays())

Expand All @@ -236,7 +254,7 @@ func (c *Compactor) runOne(ctx context.Context) error {

wg.Wait()
duration := time.Since(start)
err = multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(retentionErr, workersErr, err, ctx.Err()).Err()

if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", duration)
Expand Down
8 changes: 8 additions & 0 deletions pkg/bloomcompactor/bloomcompactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ type mockLimits struct {
shardSize int
}

func (m mockLimits) RetentionPeriod(_ string) time.Duration {
panic("implement me")
}

func (m mockLimits) StreamRetention(_ string) []validation.StreamRetention {
panic("implement me")
}

func (m mockLimits) AllByUserID() map[string]*validation.Limits {
panic("implement me")
}
Expand Down
10 changes: 8 additions & 2 deletions pkg/bloomcompactor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads"
"github.com/grafana/loki/pkg/util/ring"
)

Expand All @@ -32,6 +31,8 @@ type Config struct {
CompactionRetries int `yaml:"compaction_retries"`

MaxCompactionParallelism int `yaml:"max_compaction_parallelism"`

RetentionConfig RetentionConfig `yaml:"retention"`
}

// RegisterFlags registers flags for the Bloom-Compactor configuration.
Expand All @@ -52,6 +53,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.")
f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.")
f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.")
cfg.RetentionConfig.RegisterFlags(f)

// Ring
skipFlags := []string{
Expand All @@ -66,6 +68,10 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
}

func (cfg *Config) Validate() error {
if err := cfg.RetentionConfig.Validate(); err != nil {
return err
}

if cfg.MinTableOffset > cfg.MaxTableOffset {
return fmt.Errorf("min-table-offset (%d) must be less than or equal to max-table-offset (%d)", cfg.MinTableOffset, cfg.MaxTableOffset)
}
Expand All @@ -76,7 +82,7 @@ func (cfg *Config) Validate() error {
}

type Limits interface {
downloads.Limits
RetentionLimits
BloomCompactorShardSize(tenantID string) int
BloomCompactorEnabled(tenantID string) bool
BloomNGramLength(tenantID string) int
Expand Down
48 changes: 48 additions & 0 deletions pkg/bloomcompactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ type Metrics struct {

progress prometheus.Gauge
timePerTenant *prometheus.CounterVec

// Retention metrics
retentionRunning prometheus.Gauge
retentionTime *prometheus.HistogramVec
retentionDaysPerIteration *prometheus.HistogramVec
retentionTenantsPerIteration *prometheus.HistogramVec
retentionTenantsExceedingLookback prometheus.Gauge
}

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Expand Down Expand Up @@ -175,6 +182,47 @@ func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
Name: "tenant_compaction_seconds_total",
Help: "Time spent processing a tenant.",
}, []string{tenantLabel}),

// Retention
retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_running",
Help: "1 if retention is running in this compactor.",
}),

retentionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_time_seconds",
Help: "Time this retention process took to complete.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),

retentionDaysPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_days_processed",
Help: "Number of days iterated over during the retention process.",
// 1day -> 5 years, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 365*5, 10),
}, []string{"status"}),

retentionTenantsPerIteration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_processed",
Help: "Number of tenants on which retention was applied during the retention process.",
// 1 tenant -> 10k tenants, 10 buckets
Buckets: prometheus.ExponentialBucketsRange(1, 10000, 10),
}, []string{"status"}),

retentionTenantsExceedingLookback: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "retention_tenants_exceeding_lookback",
Help: "Number of tenants with a retention exceeding the configured retention lookback.",
}),
}

return &m
Expand Down
Loading

0 comments on commit 86c768c

Please sign in to comment.