diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 2548beab8109f..dcf0f4d502cdd 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2527,7 +2527,31 @@ ring: # CLI flag: -bloom-compactor.enabled [enabled: | default = false] +# Directory where files can be downloaded for compaction. +# CLI flag: -bloom-compactor.working-directory [working_directory: | default = ""] + +# Interval at which to re-run the compaction operation. +# CLI flag: -bloom-compactor.compaction-interval +[compaction_interval: | default = 10m] + +# Minimum backoff time between retries. +# CLI flag: -bloom-compactor.compaction-retries-min-backoff +[compaction_retries_min_backoff: | default = 10s] + +# Maximum backoff time between retries. +# CLI flag: -bloom-compactor.compaction-retries-max-backoff +[compaction_retries_max_backoff: | default = 1m] + +# Number of retries to perform when compaction fails. +# CLI flag: -bloom-compactor.compaction-retries +[compaction_retries: | default = 3] + +# Maximum number of tables to compact in parallel. While increasing this value, +# please make sure compactor has enough disk space allocated to be able to store +# and compact as many tables. +# CLI flag: -bloom-compactor.max-compaction-parallelism +[max_compaction_parallelism: | default = 1] ``` ### limits_config @@ -2914,6 +2938,23 @@ shard_streams: # CLI flag: -bloom-gateway.shard-size [bloom_gateway_shard_size: | default = 1] +# The shard size defines how many bloom compactors should be used by a tenant +# when computing blooms. If it's set to 0, shuffle sharding is disabled. +# CLI flag: -bloom-compactor.shard-size +[bloom_compactor_shard_size: | default = 1] + +# The maximum age of a table before it is compacted. Do not compact tables older +# than the the configured time. Default to 7 days. 0s means no limit. +# CLI flag: -bloom-compactor.max-table-age +[bloom_compactor_max_table_age: | default = 168h] + +# The minimum age of a table before it is compacted. Do not compact tables newer +# than the the configured time. Default to 1 hour. 0s means no limit. This is +# useful to avoid compacting tables that will be updated with out-of-order +# writes. +# CLI flag: -bloom-compactor.min-table-age +[bloom_compactor_min_table_age: | default = 1h] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 53a2597df54f0..f7bcd5df40d38 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -30,36 +30,35 @@ import ( "math" "os" "path/filepath" - "strconv" - "strings" + "sort" "time" - "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/chunk" - chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" - shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" - index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" - tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" - util_log "github.com/grafana/loki/pkg/util/log" - "github.com/go-kit/log" - "github.com/grafana/dskit/ring" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/compactor/retention" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + "github.com/grafana/loki/pkg/storage/chunk" + chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" + shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" + index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" + tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/pkg/util" ) const ( @@ -70,15 +69,20 @@ const ( type Compactor struct { services.Service - cfg Config - logger log.Logger - bloomCompactorRing ring.ReadRing + cfg Config + logger log.Logger + schemaCfg config.SchemaConfig + limits Limits + + // temporary workaround until store has implemented read/write shipper interface + bloomShipperClient bloomshipper.Client // Client used to run operations on the bucket storing bloom blocks. storeClients map[config.DayTime]storeClient - // temporary workaround until store has implemented read/write shipper interface - bloomShipperClient bloomshipper.Client + sharding ShardingStrategy + + metrics *metrics } type storeClient struct { @@ -88,21 +92,25 @@ type storeClient struct { indexShipper indexshipper.IndexShipper } -func New(cfg Config, - readRing ring.ReadRing, +func New( + cfg Config, storageCfg storage.Config, schemaConfig config.SchemaConfig, - limits downloads.Limits, + limits Limits, logger log.Logger, + sharding ShardingStrategy, clientMetrics storage.ClientMetrics, - _ prometheus.Registerer) (*Compactor, error) { + r prometheus.Registerer, +) (*Compactor, error) { c := &Compactor{ - cfg: cfg, - logger: logger, - bloomCompactorRing: readRing, + cfg: cfg, + logger: logger, + schemaCfg: schemaConfig, + sharding: sharding, + limits: limits, } - //Configure BloomClient for meta.json management + // Configure BloomClient for meta.json management bloomClient, err := bloomshipper.NewBloomClient(schemaConfig.Configs, storageCfg, clientMetrics) if err != nil { return nil, err @@ -118,11 +126,11 @@ func New(cfg Config, case config.BoltDBShipperType: indexStorageCfg = storageCfg.BoltDBShipperConfig.Config default: - level.Warn(util_log.Logger).Log("msg", "skipping period because index type is unsupported") + level.Warn(c.logger).Log("msg", "skipping period because index type is unsupported") continue } - //Configure ObjectClient and IndexShipper for series and chunk management + // Configure ObjectClient and IndexShipper for series and chunk management objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics) if err != nil { return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) @@ -157,33 +165,274 @@ func New(cfg Config, chunk: chunk_client.NewClient(objectClient, nil, schemaConfig), indexShipper: indexShipper, } - } // temporary workaround until store has implemented read/write shipper interface c.bloomShipperClient = bloomClient - // TODO use a new service with a loop - c.Service = services.NewIdleService(c.starting, c.stopping) + c.metrics = newMetrics(r) + c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds()) + + c.Service = services.NewBasicService(c.starting, c.running, c.stopping) return c, nil } -func (c *Compactor) starting(_ context.Context) error { - return nil +func (c *Compactor) starting(_ context.Context) (err error) { + c.metrics.compactorRunning.Set(1) + return err +} + +func (c *Compactor) running(ctx context.Context) error { + // Run an initial compaction before starting the interval. + if err := c.runCompaction(ctx); err != nil { + level.Error(c.logger).Log("msg", "failed to run compaction", "err", err) + } + + ticker := time.NewTicker(util.DurationWithJitter(c.cfg.CompactionInterval, 0.05)) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.metrics.compactionRunsStarted.Inc() + if err := c.runCompaction(ctx); err != nil { + c.metrics.compactionRunsErred.Inc() + level.Error(c.logger).Log("msg", "failed to run compaction", "err", err) + continue + } + c.metrics.compactionRunsCompleted.Inc() + case <-ctx.Done(): + return nil + } + } } func (c *Compactor) stopping(_ error) error { + c.metrics.compactorRunning.Set(0) return nil } -type Series struct { // TODO this can be replaced with Job struct based on Salva's ring work. - tableName, tenant string - labels labels.Labels - fingerPrint model.Fingerprint - chunks []chunk.Chunk - from, through model.Time - indexPath string +func (c *Compactor) runCompaction(ctx context.Context) error { + var tables []string + for _, sc := range c.storeClients { + // refresh index list cache since previous compaction would have changed the index files in the object store + sc.index.RefreshIndexTableNamesCache(ctx) + tbls, err := sc.index.ListTables(ctx) + if err != nil { + return fmt.Errorf("failed to list tables: %w", err) + } + tables = append(tables, tbls...) + } + + // process most recent tables first + tablesIntervals := getIntervalsForTables(tables) + sortTablesByRange(tables, tablesIntervals) + + parallelism := c.cfg.MaxCompactionParallelism + if parallelism == 0 { + parallelism = len(tables) + } + + // TODO(salvacorts): We currently parallelize at the table level. We may want to parallelize at the tenant and job level as well. + // To do that, we should create a worker pool with c.cfg.MaxCompactionParallelism number of workers. + errs := multierror.New() + _ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { + tableName := tables[i] + logger := log.With(c.logger, "table", tableName) + level.Info(logger).Log("msg", "compacting table") + err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName]) + if err != nil { + errs.Add(err) + return nil + } + level.Info(logger).Log("msg", "finished compacting table") + return nil + }) + + return errs.Err() +} + +func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableName string, tableInterval model.Interval) error { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return fmt.Errorf("interrupting compaction of table: %w", err) + } + + schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName) + if !ok { + level.Error(logger).Log("msg", "skipping compaction since we can't find schema for table") + return nil + } + + sc, ok := c.storeClients[schemaCfg.From] + if !ok { + return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String()) + } + + _, tenants, err := sc.index.ListFiles(ctx, tableName, false) + if err != nil { + return fmt.Errorf("failed to list files for table %s: %w", tableName, err) + } + + c.metrics.compactionRunDiscoveredTenants.Add(float64(len(tenants))) + level.Info(logger).Log("msg", "discovered tenants from bucket", "users", len(tenants)) + return c.compactUsers(ctx, logger, sc, tableName, tableInterval, tenants) +} + +// See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689 +func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tableInterval model.Interval, tenants []string) error { + // Keep track of tenants owned by this shard, so that we can delete the local files for all other users. + errs := multierror.New() + ownedTenants := make(map[string]struct{}, len(tenants)) + for _, tenant := range tenants { + tenantLogger := log.With(logger, "tenant", tenant) + + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return fmt.Errorf("interrupting compaction of tenants: %w", err) + } + + // Skip this table if it is too new/old for the tenant limits. + now := model.Now() + tableMinAge := c.limits.BloomCompactorMinTableAge(tenant) + tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant) + if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) { + level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now) + continue + } + if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) { + level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now) + continue + } + + // Ensure the tenant ID belongs to our shard. + if !c.sharding.OwnsTenant(tenant) { + c.metrics.compactionRunSkippedTenants.Inc() + level.Debug(tenantLogger).Log("msg", "skipping tenant because it is not owned by this shard") + continue + } + + ownedTenants[tenant] = struct{}{} + + if err := c.compactTenantWithRetries(ctx, tenantLogger, sc, tableName, tenant); err != nil { + switch { + case errors.Is(err, context.Canceled): + // We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart. + level.Info(tenantLogger).Log("msg", "compaction for tenant was interrupted by a shutdown") + return nil + default: + c.metrics.compactionRunFailedTenants.Inc() + level.Error(tenantLogger).Log("msg", "failed to compact tenant", "err", err) + errs.Add(err) + } + continue + } + + c.metrics.compactionRunSucceededTenants.Inc() + level.Info(tenantLogger).Log("msg", "successfully compacted tenant") + } + + return errs.Err() + + // TODO: Delete local files for unowned tenants, if there are any. +} + +func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { + level.Info(logger).Log("msg", "starting compaction of tenant") + + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return err + } + + // Tokenizer is not thread-safe so we need one per goroutine. + bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer) + + // TODO: Use ForEachConcurrent? + errs := multierror.New() + if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { + if isMultiTenantIndex { + return fmt.Errorf("unexpected multi-tenant") + } + + // TODO: Make these casts safely + if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( + ctx, nil, + 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod + func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { + job := NewJob(tenant, tableName, idx.Path(), fingerprint, labels, chksMetas) + jobLogger := log.With(logger, "job", job.String()) + + ownsJob, err := c.sharding.OwnsJob(job) + if err != nil { + c.metrics.compactionRunUnownedJobs.Inc() + level.Error(jobLogger).Log("msg", "failed to check if compactor owns job", "err", err) + errs.Add(err) + return + } + if !ownsJob { + c.metrics.compactionRunUnownedJobs.Inc() + level.Debug(jobLogger).Log("msg", "skipping job because it is not owned by this shard") + return + } + + if err := c.runCompact(ctx, jobLogger, job, c.bloomShipperClient, bt, sc); err != nil { + c.metrics.compactionRunFailedJobs.Inc() + errs.Add(errors.Wrap(err, "runBloomCompact")) + return + } + + c.metrics.compactionRunSucceededJobs.Inc() + }, + ); err != nil { + errs.Add(err) + } + + return nil + }); err != nil { + errs.Add(err) + } + + return errs.Err() +} + +func runWithRetries( + ctx context.Context, + minBackoff, maxBackoff time.Duration, + maxRetries int, + f func(ctx context.Context) error, +) error { + var lastErr error + + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: minBackoff, + MaxBackoff: maxBackoff, + MaxRetries: maxRetries, + }) + + for retries.Ongoing() { + lastErr = f(ctx) + if lastErr == nil { + return nil + } + + retries.Wait() + } + + return lastErr +} + +func (c *Compactor) compactTenantWithRetries(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { + return runWithRetries( + ctx, + c.cfg.RetryMinBackoff, + c.cfg.RetryMaxBackoff, + c.cfg.CompactionRetries, + func(ctx context.Context) error { + return c.compactTenant(ctx, logger, sc, tableName, tenant) + }, + ) } func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fingerprint) []chunk.Chunk { @@ -204,39 +453,44 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } // TODO Revisit this step once v1/bloom lib updated to combine blooms in the same series -func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir string) (bloomshipper.Block, error) { - localDst := createLocalDirName(workingDir, series) +func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.SeriesWithBloom, job Job, workingDir string) (bloomshipper.Block, error) { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return bloomshipper.Block{}, err + } - //write bloom to a local dir + localDst := createLocalDirName(workingDir, job) + + // write bloom to a local dir builder, err := v1.NewBlockBuilder(v1.NewBlockOptions(), v1.NewDirectoryBlockWriter(localDst)) if err != nil { - level.Info(util_log.Logger).Log("creating builder", err) + level.Error(logger).Log("creating builder", err) return bloomshipper.Block{}, err } checksum, err := builder.BuildFrom(v1.NewSliceIter([]v1.SeriesWithBloom{bloomForChks})) if err != nil { - level.Info(util_log.Logger).Log("writing bloom", err) + level.Error(logger).Log("writing bloom", err) return bloomshipper.Block{}, err } blockFile, err := os.Open(filepath.Join(localDst, bloomFileName)) if err != nil { - level.Info(util_log.Logger).Log("reading bloomBlock", err) + level.Error(logger).Log("reading bloomBlock", err) } blocks := bloomshipper.Block{ BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ - TenantID: series.tenant, - TableName: series.tableName, - MinFingerprint: uint64(series.fingerPrint), //TODO will change once we compact multiple blooms into a block - MaxFingerprint: uint64(series.fingerPrint), - StartTimestamp: series.from.Unix(), - EndTimestamp: series.through.Unix(), + TenantID: job.Tenant(), + TableName: job.TableName(), + MinFingerprint: uint64(job.Fingerprint()), // TODO will change once we compact multiple blooms into a block + MaxFingerprint: uint64(job.Fingerprint()), + StartTimestamp: job.From().Unix(), + EndTimestamp: job.Through().Unix(), Checksum: checksum, }, - IndexPath: series.indexPath, + IndexPath: job.IndexPath(), }, Data: blockFile, } @@ -244,50 +498,21 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir return blocks, nil } -// TODO Will be replaced with ring implementation in https://github.com/grafana/loki/pull/11154/ -func listSeriesForBlooms(ctx context.Context, objectClient storeClient) ([]Series, error) { - // Returns all the TSDB files, including subdirectories - prefix := "index/" - indices, _, err := objectClient.object.List(ctx, prefix, "") - - if err != nil { - return nil, err - } - - var result []Series - - for _, index := range indices { - s := strings.Split(index.Key, "/") - - if len(s) > 3 { - tableName := s[1] - - if !strings.HasPrefix(tableName, "loki_") || strings.Contains(tableName, "backup") { - continue - } - - userID := s[2] - _, err := strconv.Atoi(userID) - if err != nil { - continue - } - - result = append(result, Series{tableName: tableName, tenant: userID, indexPath: index.Key}) - } - } - return result, nil -} - -func createLocalDirName(workingDir string, series Series) string { - dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", series.tableName, series.tenant, series.fingerPrint, series.fingerPrint, series.from, series.through) +func createLocalDirName(workingDir string, job Job) string { + dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.TableName(), job.Tenant(), job.Fingerprint(), job.Fingerprint(), job.From(), job.Through()) return filepath.Join(workingDir, dir) } -func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { +func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return err + } + // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ - Fingerprint: series.fingerPrint, + Fingerprint: job.Fingerprint(), }, Bloom: &v1.Bloom{ ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), @@ -295,19 +520,18 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, } // Tokenize data into n-grams - bt.PopulateSeriesWithBloom(&bloomForChks, series.chunks) + bt.PopulateSeriesWithBloom(&bloomForChks, chunks) // Build and upload bloomBlock to storage - blocks, err := buildBloomBlock(bloomForChks, series, dst) + blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst) if err != nil { - level.Info(util_log.Logger).Log("building bloomBlocks", err) + level.Error(logger).Log("building bloomBlocks", err) return } storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks}) - if err != nil { - level.Info(util_log.Logger).Log("putting blocks to storage", err) + level.Error(logger).Log("putting blocks to storage", err) return } @@ -319,108 +543,80 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, Blocks: storedBlockRefs, } - //TODO move this to an outer layer, otherwise creates a meta per block + // TODO move this to an outer layer, otherwise creates a meta per block err = bloomShipperClient.PutMeta(ctx, meta) if err != nil { - level.Info(util_log.Logger).Log("putting meta.json to storage", err) + level.Error(logger).Log("putting meta.json to storage", err) return } return nil } -func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomshipper.Client, storeClient storeClient) error { - - series, err := listSeriesForBlooms(ctx, storeClient) - - // TODO tokenizer is not thread-safe - // consider moving to Job/worker level with https://github.com/grafana/loki/pull/11154/ - // create a tokenizer - bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer) - - if err != nil { +func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { return err } - for _, s := range series { - err := storeClient.indexShipper.ForEach(ctx, s.tableName, s.tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { - if isMultiTenantIndex { - return nil - } + // TODO call bloomShipperClient.GetMetas to get existing meta.json + var metas []bloomshipper.Meta - // TODO make this casting safe - _ = idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( - ctx, - nil, // Process all shards - 0, math.MaxInt64, // Replace with MaxLookBackPeriod - - // Get chunks for a series label and a fp - func(ls labels.Labels, fp model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - - // TODO call bloomShipperClient.GetMetas to get existing meta.json - var metas []bloomshipper.Meta - - if len(metas) == 0 { - // Get chunks data from list of chunkRefs - chks, err := storeClient.chunk.GetChunks( - ctx, - makeChunkRefs(chksMetas, s.tenant, fp), - ) - if err != nil { - level.Info(util_log.Logger).Log("getting chunks", err) - return - } - - // effectively get min and max of timestamps of the list of chunks in a series - // There must be a better way to get this, ordering chunkRefs by timestamp doesn't fully solve it - // chunk files name have this info in ObjectStore, but it's not really exposed - minFrom := model.Latest - maxThrough := model.Earliest - - for _, c := range chks { - if minFrom > c.From { - minFrom = c.From - } - if maxThrough < c.From { - maxThrough = c.Through - } - } - - series := Series{ - tableName: s.tableName, - tenant: s.tenant, - labels: ls, - fingerPrint: fp, - chunks: chks, - from: minFrom, - through: maxThrough, - indexPath: s.indexPath, - } - - err = CompactNewChunks(ctx, series, bt, bloomShipperClient, c.cfg.WorkingDirectory) - if err != nil { - return - } - } else { - // TODO complete part 2 - periodic compaction for delta from previous period - // When already compacted metas exists - // Deduplicate index paths - uniqueIndexPaths := make(map[string]struct{}) - - for _, meta := range metas { - for _, blockRef := range meta.Blocks { - uniqueIndexPaths[blockRef.IndexPath] = struct{}{} - //... - } - } - - } - }) - return nil - }) + if len(metas) == 0 { + // Get chunks data from list of chunkRefs + chks, err := storeClient.chunk.GetChunks( + ctx, + makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()), + ) + if err != nil { + return err + } + + err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) if err != nil { - return errors.Wrap(err, "getting each series") + return err + } + } else { + // TODO complete part 2 - periodic compaction for delta from previous period + // When already compacted metas exists + // Deduplicate index paths + uniqueIndexPaths := make(map[string]struct{}) + + for _, meta := range metas { + for _, blockRef := range meta.Blocks { + uniqueIndexPaths[blockRef.IndexPath] = struct{}{} + // ... + } } + } + return nil } + +func getIntervalsForTables(tables []string) map[string]model.Interval { + tablesIntervals := make(map[string]model.Interval, len(tables)) + for _, table := range tables { + tablesIntervals[table] = retention.ExtractIntervalFromTableName(table) + } + + return tablesIntervals +} + +func sortTablesByRange(tables []string, intervals map[string]model.Interval) { + sort.Slice(tables, func(i, j int) bool { + // less than if start time is after produces a most recent first sort order + return intervals[tables[i]].Start.After(intervals[tables[j]].Start) + }) +} + +// TODO: comes from pkg/compactor/compactor.go +func schemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool) { + tableInterval := retention.ExtractIntervalFromTableName(tableName) + schemaCfg, err := cfg.SchemaForTime(tableInterval.Start) + if err != nil || schemaCfg.IndexTables.TableFor(tableInterval.Start) != tableName { + return config.PeriodConfig{}, false + } + + return schemaCfg, true +} diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go new file mode 100644 index 0000000000000..65c6779750320 --- /dev/null +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -0,0 +1,134 @@ +package bloomcompactor + +import ( + "context" + "flag" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/server" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/compactor" + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" + util_log "github.com/grafana/loki/pkg/util/log" + lokiring "github.com/grafana/loki/pkg/util/ring" + "github.com/grafana/loki/pkg/validation" +) + +const ( + indexTablePrefix = "table_" + workingDirName = "working-dir" +) + +func TestCompactor_RunCompaction(t *testing.T) { + servercfg := &server.Config{} + require.Nil(t, servercfg.LogLevel.Set("debug")) + util_log.InitLogger(servercfg, nil, false) + + tempDir := t.TempDir() + indexDir := filepath.Join(tempDir, "index") + + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + From: config.DayTime{Time: model.Time(0)}, + IndexType: "tsdb", + ObjectType: "filesystem", + Schema: "v12", + IndexTables: config.IndexPeriodicTableConfig{ + PathPrefix: "index/", + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: indexTablePrefix, + Period: config.ObjectStorageIndexRequiredPeriod, + }}, + }, + }, + } + + daySeconds := int64(24 * time.Hour / time.Second) + tableNumEnd := time.Now().Unix() / daySeconds + tableNumStart := tableNumEnd - 5 + for i := tableNumStart; i <= tableNumEnd; i++ { + compactor.SetupTable( + t, + filepath.Join(indexDir, fmt.Sprintf("%s%d", indexTablePrefix, i)), + compactor.IndexesConfig{ + NumUnCompactedFiles: 5, + NumCompactedFiles: 5, + }, + compactor.PerUserIndexesConfig{ + NumUsers: 5, + IndexesConfig: compactor.IndexesConfig{ + NumUnCompactedFiles: 5, + NumCompactedFiles: 5, + }, + }, + ) + } + + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), util_log.Logger, nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + + var cfg Config + flagext.DefaultValues(&cfg) + cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName) + cfg.Ring.KVStore.Mock = kvStore + cfg.Ring.ListenPort = 0 + cfg.Ring.InstanceAddr = "bloomcompactor" + cfg.Ring.InstanceID = "bloomcompactor" + + storageConfig := storage.Config{ + FSConfig: local.FSConfig{Directory: tempDir}, + TSDBShipperConfig: indexshipper.Config{ + ActiveIndexDirectory: indexDir, + ResyncInterval: 1 * time.Minute, + Mode: indexshipper.ModeReadWrite, + CacheLocation: filepath.Join(tempDir, "cache"), + }, + } + + var limits validation.Limits + limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) + overrides, _ := validation.NewOverrides(limits, nil) + + clientMetrics := storage.NewClientMetrics() + t.Cleanup(clientMetrics.Unregister) + + ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, cfg.Ring, 1, 1, util_log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + + err = ringManager.StartAsync(context.Background()) + require.NoError(t, err) + require.Eventually(t, func() bool { + return ringManager.State() == services.Running + }, 1*time.Minute, 100*time.Millisecond) + defer func() { + ringManager.StopAsync() + require.Eventually(t, func() bool { + return ringManager.State() == services.Terminated + }, 1*time.Minute, 100*time.Millisecond) + }() + + shuffleSharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, overrides) + + c, err := New(cfg, storageConfig, schemaCfg, overrides, util_log.Logger, shuffleSharding, clientMetrics, nil) + require.NoError(t, err) + + err = c.runCompaction(context.Background()) + require.NoError(t, err) + + // TODO: Once compaction is implemented, verify compaction here. +} diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index a4c3652e6e01b..b87aa4e894918 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -2,7 +2,9 @@ package bloomcompactor import ( "flag" + "time" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" "github.com/grafana/loki/pkg/util/ring" ) @@ -13,12 +15,32 @@ type Config struct { // section and the ingester configuration by default). Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."` // Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters - Enabled bool `yaml:"enabled"` - WorkingDirectory string `yaml:"working_directory"` + Enabled bool `yaml:"enabled"` + WorkingDirectory string `yaml:"working_directory"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + + RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"` + RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` + CompactionRetries int `yaml:"compaction_retries"` + + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags for the Bloom-Compactor configuration. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f) f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.") + f.StringVar(&cfg.WorkingDirectory, "bloom-compactor.working-directory", "", "Directory where files can be downloaded for compaction.") + f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") + f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.") + f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") + f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") + f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") +} + +type Limits interface { + downloads.Limits + BloomCompactorShardSize(tenantID string) int + BloomCompactorMaxTableAge(tenantID string) time.Duration + BloomCompactorMinTableAge(tenantID string) time.Duration } diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go new file mode 100644 index 0000000000000..5257be97095d8 --- /dev/null +++ b/pkg/bloomcompactor/job.go @@ -0,0 +1,97 @@ +package bloomcompactor + +import ( + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" +) + +type Job struct { + tableName, tenantID, indexPath string + seriesLbs labels.Labels + seriesFP model.Fingerprint + chunks []index.ChunkMeta + + // We compute them lazily. + from, through *model.Time +} + +// NewJob returns a new compaction Job. +func NewJob( + tenantID string, + tableName string, + indexPath string, + seriesFP model.Fingerprint, + seriesLbs labels.Labels, + chunks []index.ChunkMeta, +) Job { + return Job{ + tenantID: tenantID, + tableName: tableName, + indexPath: indexPath, + seriesFP: seriesFP, + seriesLbs: seriesLbs, + chunks: chunks, + } +} + +func (j *Job) String() string { + return j.tableName + "_" + j.tenantID + "_" + j.seriesFP.String() +} + +func (j *Job) TableName() string { + return j.tableName +} + +func (j *Job) Tenant() string { + return j.tenantID +} + +func (j *Job) Fingerprint() model.Fingerprint { + return j.seriesFP +} + +func (j *Job) Chunks() []index.ChunkMeta { + return j.chunks +} + +func (j *Job) Labels() labels.Labels { + return j.seriesLbs +} + +func (j *Job) IndexPath() string { + return j.indexPath +} + +func (j *Job) From() model.Time { + if j.from == nil { + j.computeFromThrough() + } + return *j.from +} + +func (j *Job) Through() model.Time { + if j.through == nil { + j.computeFromThrough() + } + return *j.through +} + +func (j *Job) computeFromThrough() { + minFrom := model.Latest + maxThrough := model.Earliest + + for _, chunk := range j.chunks { + from, through := chunk.Bounds() + if minFrom > from { + minFrom = from + } + if maxThrough < through { + maxThrough = through + } + } + + j.from = &minFrom + j.through = &maxThrough +} diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go new file mode 100644 index 0000000000000..0d8ed23c54ed1 --- /dev/null +++ b/pkg/bloomcompactor/metrics.go @@ -0,0 +1,105 @@ +package bloomcompactor + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricsNamespace = "loki" + metricsSubsystem = "bloomcompactor" +) + +type metrics struct { + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionRunsErred prometheus.Counter + compactionRunDiscoveredTenants prometheus.Counter + compactionRunSkippedTenants prometheus.Counter + compactionRunSucceededTenants prometheus.Counter + compactionRunFailedTenants prometheus.Counter + compactionRunUnownedJobs prometheus.Counter + compactionRunSucceededJobs prometheus.Counter + compactionRunFailedJobs prometheus.Counter + compactionRunInterval prometheus.Gauge + compactorRunning prometheus.Gauge +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := metrics{ + compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "runs_started_total", + Help: "Total number of compactions started", + }), + compactionRunsCompleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "runs_completed_total", + Help: "Total number of compactions completed successfully", + }), + compactionRunsErred: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "runs_failed_total", + Help: "Total number of compaction runs failed", + }), + compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenants_discovered", + Help: "Number of tenants discovered during the current compaction run", + }), + compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenants_skipped", + Help: "Number of tenants skipped during the current compaction run", + }), + compactionRunSucceededTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenants_succeeded", + Help: "Number of tenants successfully processed during the current compaction run", + }), + compactionRunFailedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenants_failed", + Help: "Number of tenants failed processing during the current compaction run", + }), + compactionRunUnownedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "jobs_unowned", + Help: "Number of unowned jobs skipped during the current compaction run", + }), + compactionRunSucceededJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "jobs_succeeded", + Help: "Number of jobs successfully processed during the current compaction run", + }), + compactionRunFailedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "jobs_failed", + Help: "Number of jobs failed processing during the current compaction run", + }), + compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "compaction_interval_seconds", + Help: "The configured interval on which compaction is run in seconds", + }), + compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "running", + Help: "Value will be 1 if compactor is currently running on this instance", + }), + } + + return &m +} diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go new file mode 100644 index 0000000000000..093c0c3ac9a31 --- /dev/null +++ b/pkg/bloomcompactor/sharding.go @@ -0,0 +1,43 @@ +package bloomcompactor + +import ( + "github.com/grafana/dskit/ring" + + util_ring "github.com/grafana/loki/pkg/util/ring" +) + +var ( + // TODO: Should we include LEAVING instances in the replication set? + RingOp = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE}, nil) +) + +// ShardingStrategy describes whether compactor "owns" given user or job. +type ShardingStrategy interface { + util_ring.TenantSharding + OwnsJob(job Job) (bool, error) +} + +type ShuffleShardingStrategy struct { + util_ring.TenantSharding + ringLifeCycler *ring.BasicLifecycler +} + +func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { + s := ShuffleShardingStrategy{ + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomCompactorShardSize), + ringLifeCycler: ringLifecycler, + } + + return &s +} + +// OwnsJob makes sure only a single compactor should execute the job. +func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) { + if !s.OwnsTenant(job.Tenant()) { + return false, nil + } + + tenantRing := s.GetTenantSubRing(job.Tenant()) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, RingOp) + return fpSharding.OwnsFingerprint(uint64(job.Fingerprint())) +} diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go new file mode 100644 index 0000000000000..d99f883dd3bbb --- /dev/null +++ b/pkg/bloomcompactor/sharding_test.go @@ -0,0 +1,145 @@ +package bloomcompactor + +import ( + "context" + "flag" + "fmt" + "testing" + "time" + + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" + util_log "github.com/grafana/loki/pkg/util/log" + lokiring "github.com/grafana/loki/pkg/util/ring" + "github.com/grafana/loki/pkg/validation" +) + +func TestShuffleSharding(t *testing.T) { + const shardSize = 2 + const rings = 4 + const tenants = 2000 + const jobsPerTenant = 200 + + var limits validation.Limits + limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + var ringManagers []*lokiring.RingManager + var shards []*ShuffleShardingStrategy + for i := 0; i < rings; i++ { + var ringCfg lokiring.RingConfig + ringCfg.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("ring", flag.PanicOnError)) + ringCfg.KVStore.Store = "inmemory" + ringCfg.InstanceID = fmt.Sprintf("bloom-compactor-%d", i) + ringCfg.InstanceAddr = fmt.Sprintf("localhost-%d", i) + + ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, ringCfg, 1, 1, util_log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, ringManager.StartAsync(context.Background())) + + sharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, mockLimits{ + Limits: overrides, + bloomCompactorShardSize: shardSize, + }) + + ringManagers = append(ringManagers, ringManager) + shards = append(shards, sharding) + } + + // Wait for all rings to see each other. + for i := 0; i < rings; i++ { + require.Eventually(t, func() bool { + running := ringManagers[i].State() == services.Running + discovered := ringManagers[i].Ring.InstancesCount() == rings + return running && discovered + }, 1*time.Minute, 100*time.Millisecond) + } + + // This is kind of an un-deterministic test, because sharding is random + // and the seed is initialized by the ring lib. + // Here we'll generate a bunch of tenants and test that if the sharding doesn't own the tenant, + // that's because the tenant is owned by other ring instances. + shard := shards[0] + otherShards := shards[1:] + var ownedTenants, ownedJobs int + for i := 0; i < tenants; i++ { + tenant := fmt.Sprintf("tenant-%d", i) + ownsTenant := shard.OwnsTenant(tenant) + + var tenantOwnedByOther int + for _, other := range otherShards { + otherOwns := other.OwnsTenant(tenant) + if otherOwns { + tenantOwnedByOther++ + } + } + + // If this shard owns the tenant, shardSize-1 other members should also own the tenant. + // Otherwise, shardSize other members should own the tenant. + if ownsTenant { + require.Equal(t, shardSize-1, tenantOwnedByOther) + ownedTenants++ + } else { + require.Equal(t, shardSize, tenantOwnedByOther) + } + + for j := 0; j < jobsPerTenant; j++ { + lbls := labels.FromStrings("namespace", fmt.Sprintf("namespace-%d", j)) + job := NewJob(tenant, "", "", model.Fingerprint(lbls.Hash()), lbls, nil) + ownsJob, err := shard.OwnsJob(job) + require.NoError(t, err) + + var jobOwnedByOther int + for _, other := range otherShards { + otherOwns, err := other.OwnsJob(job) + require.NoError(t, err) + if otherOwns { + jobOwnedByOther++ + } + } + + // If this shard owns the job, no one else should own the job. + // And if this shard doesn't own the job, only one of the other shards should own the job. + if ownsJob { + require.Equal(t, 0, jobOwnedByOther) + ownedJobs++ + } else { + require.Equal(t, 1, jobOwnedByOther) + } + } + } + + t.Logf("owned tenants: %d (out of %d)", ownedTenants, tenants) + t.Logf("owned jobs: %d (out of %d)", ownedJobs, tenants*jobsPerTenant) + + // Stop all rings and wait for them to stop. + for i := 0; i < rings; i++ { + ringManagers[i].StopAsync() + require.Eventually(t, func() bool { + return ringManagers[i].State() == services.Terminated + }, 1*time.Minute, 100*time.Millisecond) + } +} + +type mockLimits struct { + downloads.Limits + bloomCompactorShardSize int +} + +func (m mockLimits) BloomCompactorShardSize(_ string) int { + return m.bloomCompactorShardSize +} + +func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration { + return 0 +} + +func (m mockLimits) BloomCompactorMinTableAge(_ string) time.Duration { + return 0 +} diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 95cf4f05ab3a6..34d3c63c43b14 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -5,6 +5,8 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + + util_ring "github.com/grafana/loki/pkg/util/ring" ) // TODO(chaudum): Replace this placeholder with actual BlockRef struct. @@ -45,20 +47,17 @@ type ShardingStrategy interface { } type ShuffleShardingStrategy struct { - r ring.ReadRing - limits Limits - instanceAddr string - instanceID string - logger log.Logger + util_ring.TenantSharding + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + logger log.Logger } -func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ - r: r, - limits: l, - instanceAddr: instanceAddr, - instanceID: instanceID, - logger: logger, + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize), + ringLifeCycler: ringLifecycler, + logger: logger, } } @@ -69,17 +68,15 @@ func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []s // instance, because of the auto-forget feature. if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil { return nil, err - } else if !set.Includes(s.instanceAddr) { + } else if !set.Includes(s.ringLifeCycler.GetInstanceID()) { return nil, errGatewayUnhealthy } var filteredIDs []string for _, tenantID := range tenantIDs { - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) - // Include the user only if it belongs to this bloom gateway shard. - if subRing.HasInstance(s.instanceID) { + if s.OwnsTenant(tenantID) { filteredIDs = append(filteredIDs, tenantID) } } @@ -94,35 +91,35 @@ func getBucket(rangeMin, rangeMax, pos uint64) int { // FilterBlocks implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) { - filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) + if !s.OwnsTenant(tenantID) { + return nil, nil + } - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) + filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - var rs ring.ReplicationSet - var err error + tenantRing := s.GetTenantSubRing(tenantID) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync) for _, blockRef := range blockRefs { - rs, err = subRing.Get(uint32(blockRef.FromFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } - rs, err = subRing.Get(uint32(blockRef.ThroughFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } } + return filteredBlockRefs, nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2fe5353293ac9..4c14a4872655f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1260,9 +1260,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { func (t *Loki) initBloomGateway() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-gateway") - instanceAddr := t.bloomGatewayRingManager.RingLifecycler.GetInstanceAddr() - instanceID := t.bloomGatewayRingManager.RingLifecycler.GetInstanceID() - shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.Overrides, instanceAddr, instanceID, logger) + shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger) gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) if err != nil { @@ -1399,12 +1397,16 @@ func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) { func (t *Loki) initBloomCompactor() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-compactor") - compactor, err := bloomcompactor.New(t.Cfg.BloomCompactor, - t.ring, + + shuffleSharding := bloomcompactor.NewShuffleShardingStrategy(t.bloomCompactorRingManager.Ring, t.bloomCompactorRingManager.RingLifecycler, t.Overrides) + + compactor, err := bloomcompactor.New( + t.Cfg.BloomCompactor, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, logger, + shuffleSharding, t.clientMetrics, prometheus.DefaultRegisterer) diff --git a/pkg/util/limiter/combined_limits.go b/pkg/util/limiter/combined_limits.go index 40d6fd508a4d4..848f677e05c9e 100644 --- a/pkg/util/limiter/combined_limits.go +++ b/pkg/util/limiter/combined_limits.go @@ -1,6 +1,7 @@ package limiter import ( + "github.com/grafana/loki/pkg/bloomcompactor" "github.com/grafana/loki/pkg/bloomgateway" "github.com/grafana/loki/pkg/compactor" "github.com/grafana/loki/pkg/distributor" @@ -24,4 +25,5 @@ type CombinedLimits interface { storage.StoreLimits indexgateway.Limits bloomgateway.Limits + bloomcompactor.Limits } diff --git a/pkg/util/ring/sharding.go b/pkg/util/ring/sharding.go new file mode 100644 index 0000000000000..cb549ec02bb90 --- /dev/null +++ b/pkg/util/ring/sharding.go @@ -0,0 +1,85 @@ +package ring + +import ( + "github.com/grafana/dskit/ring" + "github.com/prometheus/common/model" +) + +type TenantSharding interface { + GetTenantSubRing(tenantID string) ring.ReadRing + OwnsTenant(tenantID string) bool +} + +type TenantShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + shardSizeForTenant func(tenantID string) int +} + +func NewTenantShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + shardSizeForTenant func(tenantID string) int, +) *TenantShuffleSharding { + return &TenantShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + shardSizeForTenant: shardSizeForTenant, + } +} + +func (s *TenantShuffleSharding) GetTenantSubRing(tenantID string) ring.ReadRing { + shardSize := s.shardSizeForTenant(tenantID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + if shardSize <= 0 { + return s.r + } + + return s.r.ShuffleShard(tenantID, shardSize) +} + +func (s *TenantShuffleSharding) OwnsTenant(tenantID string) bool { + subRing := s.GetTenantSubRing(tenantID) + return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) +} + +type FingerprintSharding interface { + OwnsFingerprint(fp model.Fingerprint) (bool, error) +} + +// FingerprintShuffleSharding is not thread-safe. +type FingerprintShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + ringOp ring.Operation + + // Buffers for ring.Get() calls. + bufDescs []ring.InstanceDesc + bufHosts, bufZones []string +} + +func NewFingerprintShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + ringOp ring.Operation, +) *FingerprintShuffleSharding { + s := FingerprintShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + ringOp: ringOp, + } + + s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() + + return &s +} + +func (s *FingerprintShuffleSharding) OwnsFingerprint(fp uint64) (bool, error) { + rs, err := s.r.Get(uint32(fp), s.ringOp, s.bufDescs, s.bufHosts, s.bufZones) + if err != nil { + return false, err + } + + return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index c7cc4395d8f8a..90cbe29e567d2 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -177,8 +177,11 @@ type Limits struct { RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."` RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."` - IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` - BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` + BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -289,6 +292,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") + f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") + f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") + f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -768,6 +774,18 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int { return o.getOverridesForUser(userID).BloomGatewayShardSize } +func (o *Overrides) BloomCompactorShardSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorShardSize +} + +func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration { + return o.getOverridesForUser(userID).BloomCompactorMaxTableAge +} + +func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration { + return o.getOverridesForUser(userID).BloomCompactorMinTableAge +} + func (o *Overrides) AllowStructuredMetadata(userID string) bool { return o.getOverridesForUser(userID).AllowStructuredMetadata }