From 49d8a392e87580091c6f1f2b1e17452c00cd8ba8 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Tue, 31 Oct 2023 17:07:35 +0100 Subject: [PATCH 01/15] Bloom-compactor sharding --- docs/sources/configure/_index.md | 12 + pkg/bloomcompactor/bloomcompactor.go | 424 ++++++++++++++++++++-- pkg/bloomcompactor/bloomcompactor_test.go | 138 +++++++ pkg/bloomcompactor/config.go | 23 +- pkg/bloomcompactor/job.go | 43 +++ pkg/bloomcompactor/metrics.go | 92 +++++ pkg/bloomcompactor/sharding.go | 73 ++++ pkg/bloomcompactor/sharding_test.go | 137 +++++++ pkg/loki/modules.go | 11 +- pkg/util/limiter/combined_limits.go | 2 + pkg/validation/limits.go | 9 +- 11 files changed, 929 insertions(+), 35 deletions(-) create mode 100644 pkg/bloomcompactor/bloomcompactor_test.go create mode 100644 pkg/bloomcompactor/job.go create mode 100644 pkg/bloomcompactor/metrics.go create mode 100644 pkg/bloomcompactor/sharding.go create mode 100644 pkg/bloomcompactor/sharding_test.go diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 349b7016a37a8..ad57b08f6bf37 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2539,6 +2539,16 @@ ring: [working_directory: | default = ""] [max_look_back_period: ] + +[compaction_interval: ] + +[compaction_retries: ] + +[tables_to_compact: ] + +[skip_latest_n_tables: ] + +[max_compaction_parallelism: ] ``` ### limits_config @@ -2934,6 +2944,8 @@ shard_streams: # CLI flag: -bloom-gateway.shard-size [bloom_gateway_shard_size: | default = 1] +[bloom_compactor_shard_size: ] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 0608eac84a803..fdb2237b2853f 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -27,56 +27,91 @@ package bloomcompactor import ( "context" "fmt" + "math" + "math/rand" "os" "path/filepath" + "sort" "time" "github.com/go-kit/log" - "github.com/grafana/dskit/ring" + "github.com/go-kit/log/level" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/concurrency" + "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/compactor/retention" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" + shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" + index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" + tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/pkg/util" + util_log "github.com/grafana/loki/pkg/util/log" ) +// TODO: maybe we don't need all of them +type storeClient struct { + object chunk_client.ObjectClient + index index_storage.Client + chunk chunk_client.Client + indexShipper indexshipper.IndexShipper +} + type Compactor struct { services.Service - cfg Config - logger log.Logger - bloomCompactorRing ring.ReadRing - periodConfigs []config.PeriodConfig + cfg Config + logger log.Logger + schemaCfg config.SchemaConfig // temporary workaround until store has implemented read/write shipper interface bloomShipperClient bloomshipper.Client bloomStore bloomshipper.Store + + // Client used to run operations on the bucket storing bloom blocks. + storeClients map[config.DayTime]storeClient + + sharding ShardingStrategy + + metrics *metrics } -func New(cfg Config, - readRing ring.ReadRing, +func New( + cfg Config, + limits Limits, storageCfg storage.Config, - periodConfigs []config.PeriodConfig, + schemaCfg config.SchemaConfig, logger log.Logger, + sharding ShardingStrategy, clientMetrics storage.ClientMetrics, - _ prometheus.Registerer) (*Compactor, error) { + r prometheus.Registerer, +) (*Compactor, error) { c := &Compactor{ - cfg: cfg, - logger: logger, - bloomCompactorRing: readRing, - periodConfigs: periodConfigs, + cfg: cfg, + logger: logger, + schemaCfg: schemaCfg, + sharding: sharding, } - client, err := bloomshipper.NewBloomClient(periodConfigs, storageCfg, clientMetrics) + bloomClient, err := bloomshipper.NewBloomClient(schemaCfg.Configs, storageCfg, clientMetrics) if err != nil { return nil, err } shipper, err := bloomshipper.NewShipper( - client, + bloomClient, storageCfg.BloomShipperConfig, logger, ) @@ -90,22 +125,335 @@ func New(cfg Config, } // temporary workaround until store has implemented read/write shipper interface - c.bloomShipperClient = client + c.bloomShipperClient = bloomClient c.bloomStore = store - // TODO use a new service with a loop - c.Service = services.NewIdleService(c.starting, c.stopping) + + // Create object store clients + c.storeClients = make(map[config.DayTime]storeClient) + for i, periodicConfig := range schemaCfg.Configs { + var indexStorageCfg indexshipper.Config + switch periodicConfig.IndexType { + case config.TSDBType: + indexStorageCfg = storageCfg.TSDBShipperConfig.Config + case config.BoltDBShipperType: + indexStorageCfg = storageCfg.BoltDBShipperConfig.Config + default: + level.Warn(util_log.Logger).Log("msg", "skipping period because index type is unsupported") + continue + } + + objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics) + if err != nil { + return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) + } + + periodEndTime := config.DayTime{Time: math.MaxInt64} + if i < len(schemaCfg.Configs)-1 { + periodEndTime = config.DayTime{Time: schemaCfg.Configs[i+1].From.Time.Add(-time.Millisecond)} + } + + indexShipper, err := indexshipper.NewIndexShipper( + periodicConfig.IndexTables.PathPrefix, + indexStorageCfg, + objectClient, + limits, + nil, + func(p string) (shipperindex.Index, error) { + return tsdb.OpenShippableTSDB(p, tsdb.IndexOpts{}) + }, + periodicConfig.GetIndexTableNumberRange(periodEndTime), + prometheus.WrapRegistererWithPrefix("loki_tsdb_shipper_", prometheus.DefaultRegisterer), + logger, + ) + if err != nil { + return nil, errors.Wrap(err, "create index shipper") + } + + c.storeClients[periodicConfig.From] = storeClient{ + object: objectClient, + index: index_storage.NewIndexStorageClient(objectClient, periodicConfig.IndexTables.PathPrefix), + chunk: chunk_client.NewClient(objectClient, nil, schemaCfg), + indexShipper: indexShipper, + } + } + + c.metrics = newMetrics(r) + c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds()) + + c.Service = services.NewBasicService(c.starting, c.running, c.stopping) return c, nil } -func (c *Compactor) starting(_ context.Context) error { - return nil +func (c *Compactor) starting(_ context.Context) (err error) { + c.metrics.compactorRunning.Set(1) + return err +} + +func (c *Compactor) running(ctx context.Context) error { + // Run an initial compaction before starting the interval. + if err := c.runCompaction(ctx); err != nil { + level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) + } + + ticker := time.NewTicker(util.DurationWithJitter(c.cfg.CompactionInterval, 0.05)) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.metrics.compactionRunsStarted.Inc() + if err := c.runCompaction(ctx); err != nil { + c.metrics.compactionRunsErred.Inc() + level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) + continue + } + c.metrics.compactionRunsCompleted.Inc() + case <-ctx.Done(): + return nil + } + } } func (c *Compactor) stopping(_ error) error { + c.metrics.compactorRunning.Set(0) return nil } +func (c *Compactor) runCompaction(ctx context.Context) error { + var ( + tables []string + // it possible for two periods to use the same storage bucket and path prefix (different indexType or schema version) + // so more than one index storage client may end up listing the same set of buckets + // avoid including the same table twice in the compact tables list. + seen = make(map[string]struct{}) + ) + for _, sc := range c.storeClients { + // refresh index list cache since previous compaction would have changed the index files in the object store + sc.index.RefreshIndexTableNamesCache(ctx) + tbls, err := sc.index.ListTables(ctx) + if err != nil { + return fmt.Errorf("failed to list tables: %w", err) + } + + for _, table := range tbls { + if _, ok := seen[table]; ok { + continue + } + + tables = append(tables, table) + seen[table] = struct{}{} + } + } + + // process most recent tables first + sortTablesByRange(tables) + + // apply passed in compaction limits + if c.cfg.SkipLatestNTables <= len(tables) { + tables = tables[c.cfg.SkipLatestNTables:] + } + if c.cfg.TablesToCompact > 0 && c.cfg.TablesToCompact < len(tables) { + tables = tables[:c.cfg.TablesToCompact] + } + + // Reset discovered tenants metric since we will increase it in compactTable. + c.metrics.compactionRunDiscoveredTenants.Set(0) + + parallelism := c.cfg.MaxCompactionParallelism + if parallelism == 0 { + parallelism = len(tables) + } + + errs := multierror.New() + if err := concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { + tableName := tables[i] + level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName) + err := c.compactTable(ctx, tableName) + if err != nil { + errs.Add(err) + return nil + } + level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName) + return nil + }); err != nil { + errs.Add(err) + } + + return errs.Err() +} + +func (c *Compactor) compactTable(ctx context.Context, tableName string) error { + schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName) + if !ok { + level.Error(util_log.Logger).Log("msg", "skipping compaction since we can't find schema for table", "table", tableName) + return nil + } + + sc, ok := c.storeClients[schemaCfg.From] + if !ok { + return fmt.Errorf("index store client not found for period starting at %s", schemaCfg.From.String()) + } + + _, tenants, err := sc.index.ListFiles(ctx, tableName, false) + if err != nil { + return fmt.Errorf("failed to list files for table %s: %w", tableName, err) + } + + c.metrics.compactionRunDiscoveredTenants.Add(float64(len(tenants))) + level.Info(c.logger).Log("msg", "discovered tenants from bucket", "users", len(tenants)) + return c.compactUsers(ctx, sc, tableName, tenants) +} + +// See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689 +func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName string, tenants []string) error { + // When starting multiple compactor replicas nearly at the same time, running in a cluster with + // a large number of tenants, we may end up in a situation where the 1st user is compacted by + // multiple replicas at the same time. Shuffling users helps reduce the likelihood this will happen. + rand.Shuffle(len(tenants), func(i, j int) { + tenants[i], tenants[j] = tenants[j], tenants[i] + }) + + // Keep track of tenants owned by this shard, so that we can delete the local files for all other users. + errs := multierror.New() + ownedTenants := make(map[string]struct{}, len(tenants)) + for _, tenant := range tenants { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return fmt.Errorf("interrupting compaction of tenants: %w", err) + } + + // Ensure the tenant ID belongs to our shard. + owned, err := c.sharding.OwnsTenant(tenant) + if err != nil { + c.metrics.compactionRunSkippedTenants.Inc() + level.Warn(c.logger).Log("msg", "unable to check if tenant is owned by this shard", "tenantID", tenant, "err", err) + continue + } + if !owned { + c.metrics.compactionRunSkippedTenants.Inc() + level.Debug(c.logger).Log("msg", "skipping tenant because it is not owned by this shard", "tenantID", tenant) + continue + } + + ownedTenants[tenant] = struct{}{} + + if err := c.compactTenantWithRetries(ctx, sc, tableName, tenant); err != nil { + switch { + case errors.Is(err, context.Canceled): + // We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart. + level.Info(c.logger).Log("msg", "compaction for tenant was interrupted by a shutdown", "tenant", tenant) + return nil + default: + c.metrics.compactionRunFailedTenants.Inc() + level.Error(c.logger).Log("msg", "failed to compact tenant", "tenant", tenant, "err", err) + errs.Add(err) + } + continue + } + + c.metrics.compactionRunSucceededTenants.Inc() + level.Info(c.logger).Log("msg", "successfully compacted tenant", "tenant", tenant) + } + + return errs.Err() + + // TODO: Delete local files for unowned tenants, if there are any. +} + +func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName string, tenant string) error { + level.Info(c.logger).Log("msg", "starting compaction of tenant", "tenant", tenant) + + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return err + } + + // TODO: Use ForEachConcurrent? + errs := multierror.New() + if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { + if isMultiTenantIndex { + return fmt.Errorf("unexpected multi-tenant") + } + + // TODO: Make these casts safely + if err := idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( + ctx, nil, + 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod + func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { + job := NewJob(tenant, tableName, fingerprint, chksMetas) + + ownsJob, err := c.sharding.OwnsJob(job) + if err != nil { + c.metrics.compactionRunSkippedJobs.Inc() + level.Error(c.logger).Log("msg", "failed to check if compactor owns job", "job", job, "err", err) + errs.Add(err) + return + } + if !ownsJob { + c.metrics.compactionRunSkippedJobs.Inc() + level.Debug(c.logger).Log("msg", "skipping job because it is not owned by this shard", "job", job) + return + } + + if err := c.runBloomCompact(ctx, sc, job); err != nil { + c.metrics.compactionRunFailedJobs.Inc() + errs.Add(errors.Wrap(err, "runBloomCompact")) + return + } + + c.metrics.compactionRunSucceededJobs.Inc() + }, + ); err != nil { + errs.Add(err) + } + + return nil + }); err != nil { + errs.Add(err) + } + + return errs.Err() +} + +func runWithRetries( + ctx context.Context, + minBackoff, maxBackoff time.Duration, + maxRetries int, + f func(ctx context.Context) error, +) error { + var lastErr error + + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: minBackoff, + MaxBackoff: maxBackoff, + MaxRetries: maxRetries, + }) + + for retries.Ongoing() { + lastErr = f(ctx) + if lastErr == nil { + return nil + } + + retries.Wait() + } + + return lastErr +} + +func (c *Compactor) compactTenantWithRetries(ctx context.Context, sc storeClient, tableName string, tenant string) error { + return runWithRetries( + ctx, + c.cfg.retryMinBackoff, + c.cfg.retryMaxBackoff, + c.cfg.CompactionRetries, + func(ctx context.Context) error { + return c.compactTenant(ctx, sc, tableName, tenant) + }, + ) +} + // TODO Get fpRange owned by the compactor instance func NoopGetFingerprintRange() (uint64, uint64) { return 0, 0 } @@ -189,20 +537,17 @@ func (c *Compactor) compactNewChunks(ctx context.Context, dst string) (err error return nil } -func (c *Compactor) runCompact(ctx context.Context) error { +func (c *Compactor) runBloomCompact(ctx context.Context, _ storeClient, job Job) error { // TODO set MaxLookBackPeriod to Max ingester accepts maxLookBackPeriod := c.cfg.MaxLookBackPeriod - stFp, endFp := NoopGetFingerprintRange() - tenantID := NoopGetUserID() - end := time.Now().UTC().UnixMilli() start := end - maxLookBackPeriod.Milliseconds() metaSearchParams := bloomshipper.MetaSearchParams{ - TenantID: tenantID, - MinFingerprint: stFp, - MaxFingerprint: endFp, + TenantID: job.Tenant(), + MinFingerprint: uint64(job.Fingerprint()), + MaxFingerprint: uint64(job.Fingerprint()), StartTimestamp: start, EndTimestamp: end, } @@ -244,3 +589,28 @@ func (c *Compactor) runCompact(ctx context.Context) error { } return nil } + +// TODO: comes from pkg/compactor/compactor.go +func sortTablesByRange(tables []string) { + tableRanges := make(map[string]model.Interval) + for _, table := range tables { + tableRanges[table] = retention.ExtractIntervalFromTableName(table) + } + + sort.Slice(tables, func(i, j int) bool { + // less than if start time is after produces a most recent first sort order + return tableRanges[tables[i]].Start.After(tableRanges[tables[j]].Start) + }) + +} + +// TODO: comes from pkg/compactor/compactor.go +func schemaPeriodForTable(cfg config.SchemaConfig, tableName string) (config.PeriodConfig, bool) { + tableInterval := retention.ExtractIntervalFromTableName(tableName) + schemaCfg, err := cfg.SchemaForTime(tableInterval.Start) + if err != nil || schemaCfg.IndexTables.TableFor(tableInterval.Start) != tableName { + return config.PeriodConfig{}, false + } + + return schemaCfg, true +} diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go new file mode 100644 index 0000000000000..aab11de62d86e --- /dev/null +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -0,0 +1,138 @@ +package bloomcompactor + +import ( + "context" + "flag" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/grafana/dskit/flagext" + "github.com/grafana/dskit/kv/consul" + "github.com/grafana/dskit/ring" + ww "github.com/grafana/dskit/server" + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/compactor" + "github.com/grafana/loki/pkg/storage" + "github.com/grafana/loki/pkg/storage/chunk/client/local" + "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" + util_log "github.com/grafana/loki/pkg/util/log" + lokiring "github.com/grafana/loki/pkg/util/ring" + "github.com/grafana/loki/pkg/validation" +) + +const ( + indexTablePrefix = "table_" + workingDirName = "working-dir" +) + +func TestCompactor_RunCompaction(t *testing.T) { + servercfg := &ww.Config{} + require.Nil(t, servercfg.LogLevel.Set("debug")) + util_log.InitLogger(servercfg, nil, true, false) + + tempDir := t.TempDir() + indexDir := filepath.Join(tempDir, "index") + + schemaCfg := config.SchemaConfig{ + Configs: []config.PeriodConfig{ + { + From: config.DayTime{Time: model.Time(0)}, + IndexType: "tsdb", + ObjectType: "filesystem", + Schema: "v12", + IndexTables: config.IndexPeriodicTableConfig{ + PathPrefix: "index/", + PeriodicTableConfig: config.PeriodicTableConfig{ + Prefix: indexTablePrefix, + Period: config.ObjectStorageIndexRequiredPeriod, + }}, + }, + }, + } + + daySeconds := int64(24 * time.Hour / time.Second) + tableNumEnd := time.Now().Unix() / daySeconds + tableNumStart := tableNumEnd - 5 + for i := tableNumStart; i <= tableNumEnd; i++ { + compactor.SetupTable( + t, + filepath.Join(indexDir, fmt.Sprintf("%s%d", indexTablePrefix, i)), + compactor.IndexesConfig{ + NumUnCompactedFiles: 5, + NumCompactedFiles: 5, + }, + compactor.PerUserIndexesConfig{ + NumUsers: 5, + IndexesConfig: compactor.IndexesConfig{ + NumUnCompactedFiles: 5, + NumCompactedFiles: 5, + }, + }, + ) + } + + kvStore, cleanUp := consul.NewInMemoryClient(ring.GetCodec(), util_log.Logger, nil) + t.Cleanup(func() { assert.NoError(t, cleanUp.Close()) }) + + var cfg Config + flagext.DefaultValues(&cfg) + cfg.WorkingDirectory = filepath.Join(tempDir, workingDirName) + cfg.Ring.KVStore.Mock = kvStore + cfg.Ring.ListenPort = 0 + cfg.Ring.InstanceAddr = "bloomcompactor" + cfg.Ring.InstanceID = "bloomcompactor" + + storageConfig := storage.Config{ + FSConfig: local.FSConfig{Directory: tempDir}, + TSDBShipperConfig: tsdb.IndexCfg{ + Config: indexshipper.Config{ + ActiveIndexDirectory: indexDir, + ResyncInterval: 1 * time.Minute, + Mode: indexshipper.ModeReadWrite, + CacheLocation: filepath.Join(tempDir, "cache"), + }, + CachePostings: false, + }, + } + + var limits validation.Limits + limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) + overrides, _ := validation.NewOverrides(limits, nil) + + clientMetrics := storage.NewClientMetrics() + t.Cleanup(clientMetrics.Unregister) + + ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, cfg.Ring, 1, 1, util_log.Logger, prometheus.DefaultRegisterer) + require.NoError(t, err) + + err = ringManager.StartAsync(context.Background()) + require.NoError(t, err) + require.Eventually(t, func() bool { + return ringManager.State() == services.Running + }, 1*time.Minute, 100*time.Millisecond) + defer func() { + ringManager.StopAsync() + require.Eventually(t, func() bool { + return ringManager.State() == services.Terminated + }, 1*time.Minute, 100*time.Millisecond) + }() + + shuffleSharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, overrides) + + c, err := New(cfg, overrides, storageConfig, schemaCfg, util_log.Logger, shuffleSharding, clientMetrics, nil) + require.NoError(t, err) + + err = c.runCompaction(context.Background()) + require.NoError(t, err) + + // TODO: Once compaction is implemented, verify compaction here. +} diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 04cc5dba8b948..a6a6b9ac6ae31 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -4,6 +4,7 @@ import ( "flag" "time" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" "github.com/grafana/loki/pkg/util/ring" ) @@ -14,9 +15,20 @@ type Config struct { // section and the ingester configuration by default). Ring ring.RingConfig `yaml:"ring,omitempty" doc:"description=Defines the ring to be used by the bloom-compactor servers. In case this isn't configured, this block supports inheriting configuration from the common ring section."` // Enabled configures whether bloom-compactors should be used to compact index values into bloomfilters - Enabled bool `yaml:"enabled"` - WorkingDirectory string `yaml:"working_directory"` - MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` + Enabled bool `yaml:"enabled"` + WorkingDirectory string `yaml:"working_directory"` + MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"` + CompactionInterval time.Duration `yaml:"compaction_interval"` + + // No need to add options to customize the retry backoff, + // given the defaults should be fine, but allow to override + // it in tests. + retryMinBackoff time.Duration `yaml:"-"` + retryMaxBackoff time.Duration `yaml:"-"` + CompactionRetries int `yaml:"compaction_retries" category:"advanced"` + TablesToCompact int `yaml:"tables_to_compact"` + SkipLatestNTables int `yaml:"skip_latest_n_tables"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags for the Bloom-Compactor configuration. @@ -24,3 +36,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f) f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.") } + +type Limits interface { + downloads.Limits + BloomCompactorShardSize(tenantID string) int +} diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go new file mode 100644 index 0000000000000..d6feb8c3cfe53 --- /dev/null +++ b/pkg/bloomcompactor/job.go @@ -0,0 +1,43 @@ +package bloomcompactor + +import ( + "github.com/prometheus/common/model" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" +) + +// Job holds a compaction job, which consists of a group of blocks that should be compacted together. +// Not goroutine safe. +// TODO: A job should probably contain series or chunks +type Job struct { + tenantID string + tableName string + seriesFP model.Fingerprint + chunks []index.ChunkMeta +} + +// NewJob returns a new compaction Job. +func NewJob(tenantID string, tableName string, seriesFP model.Fingerprint, chunks []index.ChunkMeta) Job { + return Job{ + tenantID: tenantID, + tableName: tableName, + seriesFP: seriesFP, + chunks: chunks, + } +} + +func (j Job) String() string { + return j.tableName + "_" + j.tenantID + "_" + j.seriesFP.String() +} + +func (j Job) Tenant() string { + return j.tenantID +} + +func (j Job) Fingerprint() model.Fingerprint { + return j.seriesFP +} + +func (j Job) Chunks() []index.ChunkMeta { + return j.chunks +} diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go new file mode 100644 index 0000000000000..2eb68b5cf0493 --- /dev/null +++ b/pkg/bloomcompactor/metrics.go @@ -0,0 +1,92 @@ +package bloomcompactor + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + metricsNamespace = "loki_bloomcompactor" +) + +type metrics struct { + compactionRunsStarted prometheus.Counter + compactionRunsCompleted prometheus.Counter + compactionRunsErred prometheus.Counter + compactionRunDiscoveredTenants prometheus.Gauge + compactionRunSkippedTenants prometheus.Gauge + compactionRunSucceededTenants prometheus.Gauge + compactionRunFailedTenants prometheus.Gauge + compactionRunSkippedJobs prometheus.Gauge + compactionRunSucceededJobs prometheus.Gauge + compactionRunFailedJobs prometheus.Gauge + compactionRunInterval prometheus.Gauge + compactorRunning prometheus.Gauge +} + +func newMetrics(r prometheus.Registerer) *metrics { + m := metrics{ + compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "runs_started_total", + Help: "Total number of compactions started", + }), + compactionRunsCompleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "runs_completed_total", + Help: "Total number of compactions completed successfully", + }), + compactionRunsErred: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Name: "runs_failed_total", + Help: "Total number of compaction runs failed", + }), + compactionRunDiscoveredTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "tenants_discovered", + Help: "Number of tenants discovered during the current compaction run", + }), + compactionRunSkippedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "tenants_skipped", + Help: "Number of tenants skipped during the current compaction run", + }), + compactionRunSucceededTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "tenants_succeeded", + Help: "Number of tenants successfully processed during the current compaction run", + }), + compactionRunFailedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "tenants_failed", + Help: "Number of tenants failed processing during the current compaction run", + }), + compactionRunSkippedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "jobs_skipped", + Help: "Number of jobs skipped during the current compaction run", + }), + compactionRunSucceededJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "jobs_succeeded", + Help: "Number of jobs successfully processed during the current compaction run", + }), + compactionRunFailedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "jobs_failed", + Help: "Number of jobs failed processing during the current compaction run", + }), + compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "compaction_interval_seconds", + Help: "The configured interval on which compaction is run in seconds", + }), + compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Name: "running", + Help: "Value will be 1 if compactor is currently running on this instance", + }), + } + + return &m +} diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go new file mode 100644 index 0000000000000..8586e75fbdddb --- /dev/null +++ b/pkg/bloomcompactor/sharding.go @@ -0,0 +1,73 @@ +package bloomcompactor + +import ( + "github.com/grafana/dskit/ring" +) + +var ( + // TODO: Should we include LEAVING instances in the replication set? + RingOp = ring.NewOp([]ring.InstanceState{ring.JOINING, ring.ACTIVE}, nil) +) + +// ShardingStrategy describes whether compactor "owns" given user or job. +type ShardingStrategy interface { + OwnsTenant(tenant string) (bool, error) + OwnsJob(job Job) (bool, error) +} + +type ShuffleShardingStrategy struct { + ring *ring.Ring + ringLifeCycler *ring.BasicLifecycler + limits Limits + + // Buffers to avoid allocations in ring.Get + bufDescs []ring.InstanceDesc + bufHosts, bufZones []string +} + +func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { + s := ShuffleShardingStrategy{ + ring: r, + ringLifeCycler: ringLifecycler, + limits: limits, + } + s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() + + return &s +} + +// getShuffleShardingSubring returns the subring to be used for a given user. +func (s *ShuffleShardingStrategy) getShuffleShardingSubring(tenantID string) ring.ReadRing { + shardSize := s.limits.BloomCompactorShardSize(tenantID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + // so we just return the full ring so that blocks will be sharded across all compactors. + if shardSize <= 0 { + return s.ring + } + + return s.ring.ShuffleShard(tenantID, shardSize) +} + +func (s *ShuffleShardingStrategy) OwnsTenant(tenantID string) (bool, error) { + subRing := s.getShuffleShardingSubring(tenantID) + return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()), nil +} + +// OwnsJob makes sure only a single compactor should execute the job. +// TODO: Pretty similar to sharding strategy in pkg/bloomgateway/sharding.go +func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) { + // We check again if we own the tenant + subRing := s.getShuffleShardingSubring(job.Tenant()) + ownsTenant := subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) + if !ownsTenant { + return false, nil + } + + rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, s.bufDescs, s.bufHosts, s.bufZones) + if err != nil { + return false, err + } + + return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil +} diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go new file mode 100644 index 0000000000000..c18673fbf3513 --- /dev/null +++ b/pkg/bloomcompactor/sharding_test.go @@ -0,0 +1,137 @@ +package bloomcompactor + +import ( + "context" + "flag" + "fmt" + "testing" + "time" + + "github.com/grafana/dskit/services" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" + util_log "github.com/grafana/loki/pkg/util/log" + lokiring "github.com/grafana/loki/pkg/util/ring" + "github.com/grafana/loki/pkg/validation" +) + +func TestShuffleSharding(t *testing.T) { + const shardSize = 2 + const rings = 4 + const tenants = 2000 + const jobsPerTenant = 200 + + var limits validation.Limits + limits.RegisterFlags(flag.NewFlagSet("limits", flag.PanicOnError)) + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + + var ringManagers []*lokiring.RingManager + var shards []*ShuffleShardingStrategy + for i := 0; i < rings; i++ { + var ringCfg lokiring.RingConfig + ringCfg.RegisterFlagsWithPrefix("", "", flag.NewFlagSet("ring", flag.PanicOnError)) + ringCfg.KVStore.Store = "inmemory" + ringCfg.InstanceID = fmt.Sprintf("bloom-compactor-%d", i) + ringCfg.InstanceAddr = fmt.Sprintf("localhost-%d", i) + + ringManager, err := lokiring.NewRingManager("bloom-compactor", lokiring.ServerMode, ringCfg, 1, 1, util_log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, ringManager.StartAsync(context.Background())) + + sharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, mockLimits{ + Limits: overrides, + bloomCompactorShardSize: shardSize, + }) + + ringManagers = append(ringManagers, ringManager) + shards = append(shards, sharding) + } + + // Wait for all rings to see each other. + for i := 0; i < rings; i++ { + require.Eventually(t, func() bool { + running := ringManagers[i].State() == services.Running + discovered := ringManagers[i].Ring.InstancesCount() == rings + return running && discovered + }, 1*time.Minute, 100*time.Millisecond) + } + + // This is kind of an un-deterministic test, because sharding is random + // and the seed is initialized by the ring lib. + // Here we'll generate a bunch of tenants and test that if the sharding doesn't own the tenant, + // that's because the tenant is owned by other ring instances. + shard := shards[0] + otherShards := shards[1:] + var ownedTenants, ownedJobs int + for i := 0; i < tenants; i++ { + tenant := fmt.Sprintf("tenant-%d", i) + ownsTenant, err := shard.OwnsTenant(tenant) + require.NoError(t, err) + + var tenantOwnedByOther int + for _, other := range otherShards { + otherOwns, err := other.OwnsTenant(tenant) + require.NoError(t, err) + if otherOwns { + tenantOwnedByOther++ + } + } + + // If this shard owns the tenant, shardSize-1 other members should also own the tenant. + // Otherwise, shardSize other members should own the tenant. + if ownsTenant { + require.Equal(t, shardSize-1, tenantOwnedByOther) + ownedTenants++ + } else { + require.Equal(t, shardSize, tenantOwnedByOther) + } + + for j := 0; j < jobsPerTenant; j++ { + job := NewJob(tenant, "table", model.Fingerprint(i), nil) + ownsJob, err := shard.OwnsJob(job) + require.NoError(t, err) + + var jobOwnedByOther int + for _, other := range otherShards { + otherOwns, err := other.OwnsJob(job) + require.NoError(t, err) + if otherOwns { + jobOwnedByOther++ + } + } + + // If this shard owns the job, no one else should own the job. + // And if this shard doesn't own the job, only one of the other shards should own the job. + if ownsJob { + require.Equal(t, 0, jobOwnedByOther) + ownedJobs++ + } else { + require.Equal(t, 1, jobOwnedByOther) + } + } + } + + t.Logf("owned tenants: %d (out of %d)", ownedTenants, tenants) + t.Logf("owned jobs: %d (out of %d)", ownedJobs, tenants*jobsPerTenant) + + // Stop all rings and wait for them to stop. + for i := 0; i < rings; i++ { + ringManagers[i].StopAsync() + require.Eventually(t, func() bool { + return ringManagers[i].State() == services.Terminated + }, 1*time.Minute, 100*time.Millisecond) + } +} + +type mockLimits struct { + downloads.Limits + bloomCompactorShardSize int +} + +func (m mockLimits) BloomCompactorShardSize(_ string) int { + return m.bloomCompactorShardSize +} diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index ccd1839117cb5..e451ceb9806a8 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1392,11 +1392,16 @@ func (t *Loki) initIndexGatewayInterceptors() (services.Service, error) { func (t *Loki) initBloomCompactor() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-compactor") - compactor, err := bloomcompactor.New(t.Cfg.BloomCompactor, - t.ring, + + shuffleSharding := bloomcompactor.NewShuffleShardingStrategy(t.bloomCompactorRingManager.Ring, t.bloomCompactorRingManager.RingLifecycler, t.Overrides) + + compactor, err := bloomcompactor.New( + t.Cfg.BloomCompactor, + t.Overrides, t.Cfg.StorageConfig, - t.Cfg.SchemaConfig.Configs, + t.Cfg.SchemaConfig, logger, + shuffleSharding, t.clientMetrics, prometheus.DefaultRegisterer) diff --git a/pkg/util/limiter/combined_limits.go b/pkg/util/limiter/combined_limits.go index 40d6fd508a4d4..848f677e05c9e 100644 --- a/pkg/util/limiter/combined_limits.go +++ b/pkg/util/limiter/combined_limits.go @@ -1,6 +1,7 @@ package limiter import ( + "github.com/grafana/loki/pkg/bloomcompactor" "github.com/grafana/loki/pkg/bloomgateway" "github.com/grafana/loki/pkg/compactor" "github.com/grafana/loki/pkg/distributor" @@ -24,4 +25,5 @@ type CombinedLimits interface { storage.StoreLimits indexgateway.Limits bloomgateway.Limits + bloomcompactor.Limits } diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 0600b9ebb5ffc..dcec8cb181aad 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -181,8 +181,9 @@ type Limits struct { RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."` RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."` - IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` - BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` + BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -787,6 +788,10 @@ func (o *Overrides) BloomGatewayShardSize(userID string) int { return o.getOverridesForUser(userID).BloomGatewayShardSize } +func (o *Overrides) BloomCompactorShardSize(userID string) int { + return o.getOverridesForUser(userID).BloomCompactorShardSize +} + func (o *Overrides) AllowStructuredMetadata(userID string) bool { return o.getOverridesForUser(userID).AllowStructuredMetadata } From 85b6faea843b7b98b9e070e59d715f564e4fdc00 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 10:31:29 +0100 Subject: [PATCH 02/15] merge main followups --- docs/sources/configure/_index.md | 5 ++- pkg/bloomcompactor/bloomcompactor.go | 40 +++++++++++------------ pkg/bloomcompactor/bloomcompactor_test.go | 16 ++++----- pkg/validation/limits.go | 1 + 4 files changed, 30 insertions(+), 32 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 030511dd499f3..9058e13a0df8f 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2937,7 +2937,10 @@ shard_streams: # CLI flag: -bloom-gateway.shard-size [bloom_gateway_shard_size: | default = 1] -[bloom_compactor_shard_size: ] +# The shard size defines how many bloom compactors should be used by a tenant +# when computing blooms. +# CLI flag: -bloom-compactor.shard-size +[bloom_compactor_shard_size: | default = 0] # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 1fd1a650926a2..0afba0d9706bb 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -32,40 +32,38 @@ import ( "math/rand" "os" "path/filepath" + "sort" "strconv" "strings" - "sort" "time" - "github.com/go-kit/log/level" - "github.com/pkg/errors" - "github.com/prometheus/common/model" - - "github.com/grafana/loki/pkg/logproto" - "github.com/grafana/loki/pkg/storage/chunk" - chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" - shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" - index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" - tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" - util_log "github.com/grafana/loki/pkg/util/log" - "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/backoff" "github.com/grafana/dskit/concurrency" "github.com/grafana/dskit/multierror" "github.com/grafana/dskit/services" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/compactor/retention" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/storage" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/bloom/v1/filter" + "github.com/grafana/loki/pkg/storage/chunk" + chunk_client "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/config" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" + shipperindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/index" + index_storage "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/storage" + "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" + tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" + "github.com/grafana/loki/pkg/util" + util_log "github.com/grafana/loki/pkg/util/log" ) const ( @@ -103,7 +101,7 @@ func New( storageCfg storage.Config, schemaConfig config.SchemaConfig, limits Limits, -logger log.Logger, + logger log.Logger, sharding ShardingStrategy, clientMetrics storage.ClientMetrics, r prometheus.Registerer, @@ -135,7 +133,7 @@ logger log.Logger, continue } - //Configure ObjectClient and IndexShipper for series and chunk management + // Configure ObjectClient and IndexShipper for series and chunk management objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, storageCfg, clientMetrics) if err != nil { return nil, fmt.Errorf("error creating object client '%s': %w", periodicConfig.ObjectType, err) @@ -482,7 +480,7 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir string) (bloomshipper.Block, error) { localDst := createLocalDirName(workingDir, series) - //write bloom to a local dir + // write bloom to a local dir builder, err := v1.NewBlockBuilder(v1.NewBlockOptions(), v1.NewDirectoryBlockWriter(localDst)) if err != nil { level.Info(util_log.Logger).Log("creating builder", err) @@ -519,7 +517,7 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir Ref: bloomshipper.Ref{ TenantID: series.tenant, TableName: series.tableName, - MinFingerprint: uint64(series.fingerPrint), //TODO will change once we compact multiple blooms into a block + MinFingerprint: uint64(series.fingerPrint), // TODO will change once we compact multiple blooms into a block MaxFingerprint: uint64(series.fingerPrint), StartTimestamp: series.from.Unix(), EndTimestamp: series.through.Unix(), @@ -607,7 +605,7 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, Blocks: storedBlockRefs, } - //TODO move this to an outer layer, otherwise creates a meta per block + // TODO move this to an outer layer, otherwise creates a meta per block err = bloomShipperClient.PutMeta(ctx, meta) if err != nil { level.Info(util_log.Logger).Log("putting meta.json to storage", err) @@ -698,7 +696,7 @@ func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomship for _, meta := range metas { for _, blockRef := range meta.Blocks { uniqueIndexPaths[blockRef.IndexPath] = struct{}{} - //... + // ... } } diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go index aab11de62d86e..a57ca82a606e5 100644 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -23,7 +23,6 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper" - "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" util_log "github.com/grafana/loki/pkg/util/log" lokiring "github.com/grafana/loki/pkg/util/ring" "github.com/grafana/loki/pkg/validation" @@ -93,14 +92,11 @@ func TestCompactor_RunCompaction(t *testing.T) { storageConfig := storage.Config{ FSConfig: local.FSConfig{Directory: tempDir}, - TSDBShipperConfig: tsdb.IndexCfg{ - Config: indexshipper.Config{ - ActiveIndexDirectory: indexDir, - ResyncInterval: 1 * time.Minute, - Mode: indexshipper.ModeReadWrite, - CacheLocation: filepath.Join(tempDir, "cache"), - }, - CachePostings: false, + TSDBShipperConfig: indexshipper.Config{ + ActiveIndexDirectory: indexDir, + ResyncInterval: 1 * time.Minute, + Mode: indexshipper.ModeReadWrite, + CacheLocation: filepath.Join(tempDir, "cache"), }, } @@ -128,7 +124,7 @@ func TestCompactor_RunCompaction(t *testing.T) { shuffleSharding := NewShuffleShardingStrategy(ringManager.Ring, ringManager.RingLifecycler, overrides) - c, err := New(cfg, overrides, storageConfig, schemaCfg, util_log.Logger, shuffleSharding, clientMetrics, nil) + c, err := New(cfg, storageConfig, schemaCfg, overrides, util_log.Logger, shuffleSharding, clientMetrics, nil) require.NoError(t, err) err = c.runCompaction(context.Background()) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index dcec8cb181aad..df9c608e2bccc 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -298,6 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") + f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) From 18fdaa27f61d1f87b21feab55862b6c1e01d8c38 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 11:02:09 +0100 Subject: [PATCH 03/15] Remove duplicated logic inside runCompact --- pkg/bloomcompactor/bloomcompactor.go | 179 +++++++++------------------ pkg/bloomcompactor/job.go | 31 +++-- 2 files changed, 79 insertions(+), 131 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 0afba0d9706bb..2f28c3f1d83d3 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -33,8 +33,6 @@ import ( "os" "path/filepath" "sort" - "strconv" - "strings" "time" "github.com/go-kit/log" @@ -365,6 +363,9 @@ func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName return err } + // Tokenizer is not thread-safe so we need one per goroutine. + bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer) + // TODO: Use ForEachConcurrent? errs := multierror.New() if err := sc.indexShipper.ForEach(ctx, tableName, tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { @@ -377,7 +378,7 @@ func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName ctx, nil, 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - job := NewJob(tenant, tableName, fingerprint, chksMetas) + job := NewJob(tenant, tableName, idx.Path(), fingerprint, labels, chksMetas) ownsJob, err := c.sharding.OwnsJob(job) if err != nil { @@ -392,7 +393,7 @@ func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName return } - if err := c.runCompact(ctx, c.bloomShipperClient, sc, job); err != nil { + if err := c.runCompact(ctx, c.bloomShipperClient, bt, sc, job); err != nil { c.metrics.compactionRunFailedJobs.Inc() errs.Add(errors.Wrap(err, "runBloomCompact")) return @@ -531,40 +532,6 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir return blocks, nil } -// TODO Will be replaced with ring implementation in https://github.com/grafana/loki/pull/11154/ -func listSeriesForBlooms(ctx context.Context, objectClient storeClient) ([]Series, error) { - // Returns all the TSDB files, including subdirectories - prefix := "index/" - indices, _, err := objectClient.object.List(ctx, prefix, "") - - if err != nil { - return nil, err - } - - var result []Series - - for _, index := range indices { - s := strings.Split(index.Key, "/") - - if len(s) > 3 { - tableName := s[1] - - if !strings.HasPrefix(tableName, "loki_") || strings.Contains(tableName, "backup") { - continue - } - - userID := s[2] - _, err := strconv.Atoi(userID) - if err != nil { - continue - } - - result = append(result, Series{tableName: tableName, tenant: userID, indexPath: index.Key}) - } - } - return result, nil -} - func createLocalDirName(workingDir string, series Series) string { dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", series.tableName, series.tenant, series.fingerPrint, series.fingerPrint, series.from, series.through) return filepath.Join(workingDir, dir) @@ -615,99 +582,65 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, return nil } -func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomshipper.Client, storeClient storeClient, _ Job) error { +func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient, job Job) error { + // TODO call bloomShipperClient.GetMetas to get existing meta.json + var metas []bloomshipper.Meta - series, err := listSeriesForBlooms(ctx, storeClient) - - // TODO tokenizer is not thread-safe - // consider moving to Job/worker level with https://github.com/grafana/loki/pull/11154/ - // create a tokenizer - bt, _ := v1.NewBloomTokenizer(prometheus.DefaultRegisterer) + if len(metas) == 0 { + // Get chunks data from list of chunkRefs + chks, err := storeClient.chunk.GetChunks( + ctx, + makeChunkRefs(job.Chunks(), job.Tenant(), job.Fingerprint()), + ) + if err != nil { + return err + } - if err != nil { - return err - } + // effectively get min and max of timestamps of the list of chunks in a series + // There must be a better way to get this, ordering chunkRefs by timestamp doesn't fully solve it + // chunk files name have this info in ObjectStore, but it's not really exposed + minFrom := model.Latest + maxThrough := model.Earliest - for _, s := range series { - err := storeClient.indexShipper.ForEach(ctx, s.tableName, s.tenant, func(isMultiTenantIndex bool, idx shipperindex.Index) error { - if isMultiTenantIndex { - return nil + for _, c := range chks { + if minFrom > c.From { + minFrom = c.From + } + if maxThrough < c.From { + maxThrough = c.Through } + } - // TODO make this casting safe - _ = idx.(*tsdb.TSDBFile).Index.(*tsdb.TSDBIndex).ForSeries( - ctx, - nil, // Process all shards - 0, math.MaxInt64, // Replace with MaxLookBackPeriod - - // Get chunks for a series label and a fp - func(ls labels.Labels, fp model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { - - // TODO call bloomShipperClient.GetMetas to get existing meta.json - var metas []bloomshipper.Meta - - if len(metas) == 0 { - // Get chunks data from list of chunkRefs - chks, err := storeClient.chunk.GetChunks( - ctx, - makeChunkRefs(chksMetas, s.tenant, fp), - ) - if err != nil { - level.Info(util_log.Logger).Log("getting chunks", err) - return - } - - // effectively get min and max of timestamps of the list of chunks in a series - // There must be a better way to get this, ordering chunkRefs by timestamp doesn't fully solve it - // chunk files name have this info in ObjectStore, but it's not really exposed - minFrom := model.Latest - maxThrough := model.Earliest - - for _, c := range chks { - if minFrom > c.From { - minFrom = c.From - } - if maxThrough < c.From { - maxThrough = c.Through - } - } - - series := Series{ - tableName: s.tableName, - tenant: s.tenant, - labels: ls, - fingerPrint: fp, - chunks: chks, - from: minFrom, - through: maxThrough, - indexPath: s.indexPath, - } - - err = CompactNewChunks(ctx, series, bt, bloomShipperClient, c.cfg.WorkingDirectory) - if err != nil { - return - } - } else { - // TODO complete part 2 - periodic compaction for delta from previous period - // When already compacted metas exists - // Deduplicate index paths - uniqueIndexPaths := make(map[string]struct{}) - - for _, meta := range metas { - for _, blockRef := range meta.Blocks { - uniqueIndexPaths[blockRef.IndexPath] = struct{}{} - // ... - } - } - - } - }) - return nil - }) + series := Series{ + tableName: job.tableName, + tenant: job.Tenant(), + labels: job.Labels(), + fingerPrint: job.Fingerprint(), + chunks: chks, + from: minFrom, + through: maxThrough, + indexPath: job.IndexPath(), + } + + err = CompactNewChunks(ctx, series, bt, bloomShipperClient, c.cfg.WorkingDirectory) if err != nil { - return errors.Wrap(err, "getting each series") + return err } + } else { + // TODO complete part 2 - periodic compaction for delta from previous period + // When already compacted metas exists + // Deduplicate index paths + uniqueIndexPaths := make(map[string]struct{}) + + for _, meta := range metas { + for _, blockRef := range meta.Blocks { + uniqueIndexPaths[blockRef.IndexPath] = struct{}{} + // ... + } + } + } + return nil } diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go index d6feb8c3cfe53..95d716b69470a 100644 --- a/pkg/bloomcompactor/job.go +++ b/pkg/bloomcompactor/job.go @@ -2,26 +2,33 @@ package bloomcompactor import ( "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) -// Job holds a compaction job, which consists of a group of blocks that should be compacted together. -// Not goroutine safe. -// TODO: A job should probably contain series or chunks type Job struct { - tenantID string - tableName string - seriesFP model.Fingerprint - chunks []index.ChunkMeta + tableName, tenantID, indexPath string + seriesLbs labels.Labels + seriesFP model.Fingerprint + chunks []index.ChunkMeta } // NewJob returns a new compaction Job. -func NewJob(tenantID string, tableName string, seriesFP model.Fingerprint, chunks []index.ChunkMeta) Job { +func NewJob( + tenantID string, + tableName string, + indexPath string, + seriesFP model.Fingerprint, + seriesLbs labels.Labels, + chunks []index.ChunkMeta, +) Job { return Job{ tenantID: tenantID, tableName: tableName, + indexPath: indexPath, seriesFP: seriesFP, + seriesLbs: seriesLbs, chunks: chunks, } } @@ -41,3 +48,11 @@ func (j Job) Fingerprint() model.Fingerprint { func (j Job) Chunks() []index.ChunkMeta { return j.chunks } + +func (j Job) Labels() labels.Labels { + return j.seriesLbs +} + +func (j Job) IndexPath() string { + return j.indexPath +} From 5211754299a783b4a5e682f0f61179003a2e170c Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 11:32:18 +0100 Subject: [PATCH 04/15] WIP apply CR feedback --- pkg/bloomcompactor/bloomcompactor.go | 34 +++++++++++------------ pkg/bloomcompactor/bloomcompactor_test.go | 4 +-- pkg/bloomcompactor/config.go | 24 ++++++++++------ pkg/bloomcompactor/metrics.go | 15 +++++++++- pkg/bloomcompactor/sharding.go | 7 +---- 5 files changed, 49 insertions(+), 35 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index 2f28c3f1d83d3..c8988d4babeb8 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -127,7 +127,7 @@ func New( case config.BoltDBShipperType: indexStorageCfg = storageCfg.BoltDBShipperConfig.Config default: - level.Warn(util_log.Logger).Log("msg", "skipping period because index type is unsupported") + level.Warn(c.logger).Log("msg", "skipping period because index type is unsupported") continue } @@ -187,7 +187,7 @@ func (c *Compactor) starting(_ context.Context) (err error) { func (c *Compactor) running(ctx context.Context) error { // Run an initial compaction before starting the interval. if err := c.runCompaction(ctx); err != nil { - level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) + level.Error(c.logger).Log("msg", "failed to run compaction", "err", err) } ticker := time.NewTicker(util.DurationWithJitter(c.cfg.CompactionInterval, 0.05)) @@ -199,7 +199,7 @@ func (c *Compactor) running(ctx context.Context) error { c.metrics.compactionRunsStarted.Inc() if err := c.runCompaction(ctx); err != nil { c.metrics.compactionRunsErred.Inc() - level.Error(util_log.Logger).Log("msg", "failed to run compaction", "err", err) + level.Error(c.logger).Log("msg", "failed to run compaction", "err", err) continue } c.metrics.compactionRunsCompleted.Inc() @@ -260,19 +260,17 @@ func (c *Compactor) runCompaction(ctx context.Context) error { } errs := multierror.New() - if err := concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { + _ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { tableName := tables[i] - level.Info(util_log.Logger).Log("msg", "compacting table", "table-name", tableName) + level.Info(c.logger).Log("msg", "compacting table", "table-name", tableName) err := c.compactTable(ctx, tableName) if err != nil { errs.Add(err) return nil } - level.Info(util_log.Logger).Log("msg", "finished compacting table", "table-name", tableName) + level.Info(c.logger).Log("msg", "finished compacting table", "table-name", tableName) return nil - }); err != nil { - errs.Add(err) - } + }) return errs.Err() } @@ -280,7 +278,7 @@ func (c *Compactor) runCompaction(ctx context.Context) error { func (c *Compactor) compactTable(ctx context.Context, tableName string) error { schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName) if !ok { - level.Error(util_log.Logger).Log("msg", "skipping compaction since we can't find schema for table", "table", tableName) + level.Error(c.logger).Log("msg", "skipping compaction since we can't find schema for table", "table", tableName) return nil } @@ -312,6 +310,8 @@ func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName errs := multierror.New() ownedTenants := make(map[string]struct{}, len(tenants)) for _, tenant := range tenants { + logger := log.With(c.logger, "tenant", tenant) + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { return fmt.Errorf("interrupting compaction of tenants: %w", err) @@ -321,12 +321,12 @@ func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName owned, err := c.sharding.OwnsTenant(tenant) if err != nil { c.metrics.compactionRunSkippedTenants.Inc() - level.Warn(c.logger).Log("msg", "unable to check if tenant is owned by this shard", "tenantID", tenant, "err", err) + level.Warn(logger).Log("msg", "unable to check if tenant is owned by this shard", "err", err) continue } if !owned { c.metrics.compactionRunSkippedTenants.Inc() - level.Debug(c.logger).Log("msg", "skipping tenant because it is not owned by this shard", "tenantID", tenant) + level.Debug(logger).Log("msg", "skipping tenant because it is not owned by this shard") continue } @@ -336,18 +336,18 @@ func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName switch { case errors.Is(err, context.Canceled): // We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart. - level.Info(c.logger).Log("msg", "compaction for tenant was interrupted by a shutdown", "tenant", tenant) + level.Info(logger).Log("msg", "compaction for tenant was interrupted by a shutdown") return nil default: c.metrics.compactionRunFailedTenants.Inc() - level.Error(c.logger).Log("msg", "failed to compact tenant", "tenant", tenant, "err", err) + level.Error(logger).Log("msg", "failed to compact tenant", "err", err) errs.Add(err) } continue } c.metrics.compactionRunSucceededTenants.Inc() - level.Info(c.logger).Log("msg", "successfully compacted tenant", "tenant", tenant) + level.Info(logger).Log("msg", "successfully compacted tenant") } return errs.Err() @@ -442,8 +442,8 @@ func runWithRetries( func (c *Compactor) compactTenantWithRetries(ctx context.Context, sc storeClient, tableName string, tenant string) error { return runWithRetries( ctx, - c.cfg.retryMinBackoff, - c.cfg.retryMaxBackoff, + c.cfg.RetryMinBackoff, + c.cfg.RetryMaxBackoff, c.cfg.CompactionRetries, func(ctx context.Context) error { return c.compactTenant(ctx, sc, tableName, tenant) diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go index a57ca82a606e5..a3947439a5fde 100644 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -11,7 +11,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv/consul" "github.com/grafana/dskit/ring" - ww "github.com/grafana/dskit/server" + "github.com/grafana/dskit/server" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -34,7 +34,7 @@ const ( ) func TestCompactor_RunCompaction(t *testing.T) { - servercfg := &ww.Config{} + servercfg := &server.Config{} require.Nil(t, servercfg.LogLevel.Set("debug")) util_log.InitLogger(servercfg, nil, true, false) diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 7d8a965170adf..94314f7205d9c 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -19,21 +19,27 @@ type Config struct { WorkingDirectory string `yaml:"working_directory"` CompactionInterval time.Duration `yaml:"compaction_interval"` - // No need to add options to customize the retry backoff, - // given the defaults should be fine, but allow to override - // it in tests. - retryMinBackoff time.Duration `yaml:"-"` - retryMaxBackoff time.Duration `yaml:"-"` - CompactionRetries int `yaml:"compaction_retries" category:"advanced"` - TablesToCompact int `yaml:"tables_to_compact"` - SkipLatestNTables int `yaml:"skip_latest_n_tables"` - MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + RetryMinBackoff time.Duration `yaml:"compaction_retries_min_backoff"` + RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` + CompactionRetries int `yaml:"compaction_retries"` + + TablesToCompact int `yaml:"tables_to_compact"` + SkipLatestNTables int `yaml:"skip_latest_n_tables"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags for the Bloom-Compactor configuration. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.Ring.RegisterFlagsWithPrefix("bloom-compactor.", "collectors/", f) f.BoolVar(&cfg.Enabled, "bloom-compactor.enabled", false, "Flag to enable or disable the usage of the bloom-compactor component.") + f.StringVar(&cfg.WorkingDirectory, "bloom-compactor.working-directory", "", "Directory where files can be downloaded for compaction.") + f.DurationVar(&cfg.CompactionInterval, "bloom-compactor.compaction-interval", 10*time.Minute, "Interval at which to re-run the compaction operation.") + f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.") + f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") + f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") + f.IntVar(&cfg.TablesToCompact, "bloom-compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.") + f.IntVar(&cfg.SkipLatestNTables, "bloom-compactor.skip-latest-n-tables", 0, "Do not compact N latest tables.") + f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") } type Limits interface { diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index 2eb68b5cf0493..e5a71609fd189 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -6,7 +6,8 @@ import ( ) const ( - metricsNamespace = "loki_bloomcompactor" + metricsNamespace = "loki" + metricsSubsystem = "bloomcompactor" ) type metrics struct { @@ -28,61 +29,73 @@ func newMetrics(r prometheus.Registerer) *metrics { m := metrics{ compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "runs_started_total", Help: "Total number of compactions started", }), compactionRunsCompleted: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "runs_completed_total", Help: "Total number of compactions completed successfully", }), compactionRunsErred: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "runs_failed_total", Help: "Total number of compaction runs failed", }), compactionRunDiscoveredTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "tenants_discovered", Help: "Number of tenants discovered during the current compaction run", }), compactionRunSkippedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "tenants_skipped", Help: "Number of tenants skipped during the current compaction run", }), compactionRunSucceededTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "tenants_succeeded", Help: "Number of tenants successfully processed during the current compaction run", }), compactionRunFailedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "tenants_failed", Help: "Number of tenants failed processing during the current compaction run", }), compactionRunSkippedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "jobs_skipped", Help: "Number of jobs skipped during the current compaction run", }), compactionRunSucceededJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "jobs_succeeded", Help: "Number of jobs successfully processed during the current compaction run", }), compactionRunFailedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "jobs_failed", Help: "Number of jobs failed processing during the current compaction run", }), compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "compaction_interval_seconds", Help: "The configured interval on which compaction is run in seconds", }), compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ Namespace: metricsNamespace, + Subsystem: metricsSubsystem, Name: "running", Help: "Value will be 1 if compactor is currently running on this instance", }), diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go index 8586e75fbdddb..9d97c0444750e 100644 --- a/pkg/bloomcompactor/sharding.go +++ b/pkg/bloomcompactor/sharding.go @@ -19,10 +19,6 @@ type ShuffleShardingStrategy struct { ring *ring.Ring ringLifeCycler *ring.BasicLifecycler limits Limits - - // Buffers to avoid allocations in ring.Get - bufDescs []ring.InstanceDesc - bufHosts, bufZones []string } func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { @@ -31,7 +27,6 @@ func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycl ringLifeCycler: ringLifecycler, limits: limits, } - s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() return &s } @@ -64,7 +59,7 @@ func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) { return false, nil } - rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, s.bufDescs, s.bufHosts, s.bufZones) + rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, nil, nil, nil) if err != nil { return false, err } From 12b31f4408311927667dd95d3cb58ed044002802 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 11:34:47 +0100 Subject: [PATCH 05/15] Fix test --- pkg/bloomcompactor/sharding_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go index c18673fbf3513..22eac1641e7b1 100644 --- a/pkg/bloomcompactor/sharding_test.go +++ b/pkg/bloomcompactor/sharding_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/downloads" @@ -91,7 +92,8 @@ func TestShuffleSharding(t *testing.T) { } for j := 0; j < jobsPerTenant; j++ { - job := NewJob(tenant, "table", model.Fingerprint(i), nil) + lbls := labels.FromStrings("namespace", fmt.Sprintf("namespace-%d", j)) + job := NewJob(tenant, "", "", model.Fingerprint(lbls.Hash()), lbls, nil) ownsJob, err := shard.OwnsJob(job) require.NoError(t, err) From e0fa94528d22f97ba617216381185067495f891f Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 12:43:23 +0100 Subject: [PATCH 06/15] Chnage metrics to counter --- docs/sources/configure/_index.md | 37 ++++++++++++++++++++++------ pkg/bloomcompactor/bloomcompactor.go | 3 --- pkg/bloomcompactor/metrics.go | 28 ++++++++++----------- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 9058e13a0df8f..dbdaad5490f11 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2531,17 +2531,40 @@ ring: # CLI flag: -bloom-compactor.enabled [enabled: | default = false] +# Directory where files can be downloaded for compaction. +# CLI flag: -bloom-compactor.working-directory [working_directory: | default = ""] -[compaction_interval: ] +# Interval at which to re-run the compaction operation. +# CLI flag: -bloom-compactor.compaction-interval +[compaction_interval: | default = 10m] + +# Minimum backoff time between retries. +# CLI flag: -bloom-compactor.compaction-retries-min-backoff +[compaction_retries_min_backoff: | default = 10s] -[compaction_retries: ] +# Maximum backoff time between retries. +# CLI flag: -bloom-compactor.compaction-retries-max-backoff +[compaction_retries_max_backoff: | default = 1m] -[tables_to_compact: ] +# Number of retries to perform when compaction fails. +# CLI flag: -bloom-compactor.compaction-retries +[compaction_retries: | default = 3] -[skip_latest_n_tables: ] +# Number of tables that compactor will try to compact. Newer tables are chosen +# when this is less than the number of tables available. +# CLI flag: -bloom-compactor.tables-to-compact +[tables_to_compact: | default = 0] -[max_compaction_parallelism: ] +# Do not compact N latest tables. +# CLI flag: -bloom-compactor.skip-latest-n-tables +[skip_latest_n_tables: | default = 0] + +# Maximum number of tables to compact in parallel. While increasing this value, +# please make sure compactor has enough disk space allocated to be able to store +# and compact as many tables. +# CLI flag: -bloom-compactor.max-compaction-parallelism +[max_compaction_parallelism: | default = 1] ``` ### limits_config @@ -2938,9 +2961,9 @@ shard_streams: [bloom_gateway_shard_size: | default = 1] # The shard size defines how many bloom compactors should be used by a tenant -# when computing blooms. +# when computing blooms. If it's set to 0, shuffle sharding is disabled. # CLI flag: -bloom-compactor.shard-size -[bloom_compactor_shard_size: | default = 0] +[bloom_compactor_shard_size: | default = 1] # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index c8988d4babeb8..d93e430d93e52 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -251,9 +251,6 @@ func (c *Compactor) runCompaction(ctx context.Context) error { tables = tables[:c.cfg.TablesToCompact] } - // Reset discovered tenants metric since we will increase it in compactTable. - c.metrics.compactionRunDiscoveredTenants.Set(0) - parallelism := c.cfg.MaxCompactionParallelism if parallelism == 0 { parallelism = len(tables) diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index e5a71609fd189..73c4d3865541e 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -14,13 +14,13 @@ type metrics struct { compactionRunsStarted prometheus.Counter compactionRunsCompleted prometheus.Counter compactionRunsErred prometheus.Counter - compactionRunDiscoveredTenants prometheus.Gauge - compactionRunSkippedTenants prometheus.Gauge - compactionRunSucceededTenants prometheus.Gauge - compactionRunFailedTenants prometheus.Gauge - compactionRunSkippedJobs prometheus.Gauge - compactionRunSucceededJobs prometheus.Gauge - compactionRunFailedJobs prometheus.Gauge + compactionRunDiscoveredTenants prometheus.Counter + compactionRunSkippedTenants prometheus.Counter + compactionRunSucceededTenants prometheus.Counter + compactionRunFailedTenants prometheus.Counter + compactionRunSkippedJobs prometheus.Counter + compactionRunSucceededJobs prometheus.Counter + compactionRunFailedJobs prometheus.Counter compactionRunInterval prometheus.Gauge compactorRunning prometheus.Gauge } @@ -45,43 +45,43 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "runs_failed_total", Help: "Total number of compaction runs failed", }), - compactionRunDiscoveredTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "tenants_discovered", Help: "Number of tenants discovered during the current compaction run", }), - compactionRunSkippedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "tenants_skipped", Help: "Number of tenants skipped during the current compaction run", }), - compactionRunSucceededTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunSucceededTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "tenants_succeeded", Help: "Number of tenants successfully processed during the current compaction run", }), - compactionRunFailedTenants: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunFailedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "tenants_failed", Help: "Number of tenants failed processing during the current compaction run", }), - compactionRunSkippedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunSkippedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "jobs_skipped", Help: "Number of jobs skipped during the current compaction run", }), - compactionRunSucceededJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunSucceededJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "jobs_succeeded", Help: "Number of jobs successfully processed during the current compaction run", }), - compactionRunFailedJobs: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + compactionRunFailedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, Name: "jobs_failed", From 27240d7fbc4ffee0dbfaf43c2fe4117202c9dbb5 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 12:44:43 +0100 Subject: [PATCH 07/15] Change default shard size --- pkg/validation/limits.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index df9c608e2bccc..4447489fe75f1 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -298,7 +298,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") - f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 0, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms.") + f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) From 368f8f0fd6c0bbfcd0258b1c1b6332dab5e3f3bb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 12:45:36 +0100 Subject: [PATCH 08/15] Extract common logic for sharding from bloom-gw and bloom-compactor into a ring utils lib --- pkg/bloomcompactor/sharding.go | 42 ++++------------- pkg/bloomgateway/sharding.go | 48 +++++++++---------- pkg/loki/modules.go | 4 +- pkg/util/ring/sharding.go | 85 ++++++++++++++++++++++++++++++++++ 4 files changed, 116 insertions(+), 63 deletions(-) create mode 100644 pkg/util/ring/sharding.go diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go index 9d97c0444750e..10a023827167a 100644 --- a/pkg/bloomcompactor/sharding.go +++ b/pkg/bloomcompactor/sharding.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) var ( @@ -11,58 +12,31 @@ var ( // ShardingStrategy describes whether compactor "owns" given user or job. type ShardingStrategy interface { - OwnsTenant(tenant string) (bool, error) + util_ring.TenantSharding OwnsJob(job Job) (bool, error) } type ShuffleShardingStrategy struct { - ring *ring.Ring + util_ring.TenantSharding ringLifeCycler *ring.BasicLifecycler - limits Limits } func NewShuffleShardingStrategy(r *ring.Ring, ringLifecycler *ring.BasicLifecycler, limits Limits) *ShuffleShardingStrategy { s := ShuffleShardingStrategy{ - ring: r, + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomCompactorShardSize), ringLifeCycler: ringLifecycler, - limits: limits, } return &s } -// getShuffleShardingSubring returns the subring to be used for a given user. -func (s *ShuffleShardingStrategy) getShuffleShardingSubring(tenantID string) ring.ReadRing { - shardSize := s.limits.BloomCompactorShardSize(tenantID) - - // A shard size of 0 means shuffle sharding is disabled for this specific user, - // so we just return the full ring so that blocks will be sharded across all compactors. - if shardSize <= 0 { - return s.ring - } - - return s.ring.ShuffleShard(tenantID, shardSize) -} - -func (s *ShuffleShardingStrategy) OwnsTenant(tenantID string) (bool, error) { - subRing := s.getShuffleShardingSubring(tenantID) - return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()), nil -} - // OwnsJob makes sure only a single compactor should execute the job. -// TODO: Pretty similar to sharding strategy in pkg/bloomgateway/sharding.go func (s *ShuffleShardingStrategy) OwnsJob(job Job) (bool, error) { - // We check again if we own the tenant - subRing := s.getShuffleShardingSubring(job.Tenant()) - ownsTenant := subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) - if !ownsTenant { + if !s.OwnsTenant(job.Tenant()) { return false, nil } - rs, err := subRing.Get(uint32(job.Fingerprint()), RingOp, nil, nil, nil) - if err != nil { - return false, err - } - - return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil + tenantRing := s.GetTenantSubRing(job.Tenant()) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, RingOp) + return fpSharding.OwnsFingerprint(uint64(job.Fingerprint())) } diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 95cf4f05ab3a6..598a2046d1e01 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) // TODO(chaudum): Replace this placeholder with actual BlockRef struct. @@ -45,20 +46,17 @@ type ShardingStrategy interface { } type ShuffleShardingStrategy struct { - r ring.ReadRing - limits Limits - instanceAddr string - instanceID string - logger log.Logger + util_ring.TenantSharding + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + logger log.Logger } -func NewShuffleShardingStrategy(r ring.ReadRing, l Limits, instanceAddr, instanceID string, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r ring.ReadRing, ringLifecycler *ring.BasicLifecycler, limits Limits, logger log.Logger) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ - r: r, - limits: l, - instanceAddr: instanceAddr, - instanceID: instanceID, - logger: logger, + TenantSharding: util_ring.NewTenantShuffleSharding(r, ringLifecycler, limits.BloomGatewayShardSize), + ringLifeCycler: ringLifecycler, + logger: logger, } } @@ -69,17 +67,15 @@ func (s *ShuffleShardingStrategy) FilterTenants(_ context.Context, tenantIDs []s // instance, because of the auto-forget feature. if set, err := s.r.GetAllHealthy(BlocksOwnerSync); err != nil { return nil, err - } else if !set.Includes(s.instanceAddr) { + } else if !set.Includes(s.ringLifeCycler.GetInstanceID()) { return nil, errGatewayUnhealthy } var filteredIDs []string for _, tenantID := range tenantIDs { - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) - // Include the user only if it belongs to this bloom gateway shard. - if subRing.HasInstance(s.instanceID) { + if s.OwnsTenant(tenantID) { filteredIDs = append(filteredIDs, tenantID) } } @@ -94,35 +90,35 @@ func getBucket(rangeMin, rangeMax, pos uint64) int { // FilterBlocks implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, tenantID string, blockRefs []BlockRef) ([]BlockRef, error) { - filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) + if !s.OwnsTenant(tenantID) { + return nil, nil + } - subRing := GetShuffleShardingSubring(s.r, tenantID, s.limits) + filteredBlockRefs := make([]BlockRef, 0, len(blockRefs)) - bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet() - var rs ring.ReplicationSet - var err error + tenantRing := s.GetTenantSubRing(tenantID) + fpSharding := util_ring.NewFingerprintShuffleSharding(tenantRing, s.ringLifeCycler, BlocksOwnerSync) for _, blockRef := range blockRefs { - rs, err = subRing.Get(uint32(blockRef.FromFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err := fpSharding.OwnsFingerprint(blockRef.FromFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } - rs, err = subRing.Get(uint32(blockRef.ThroughFp), BlocksOwnerSync, bufDescs, bufHosts, bufZones) + owns, err = fpSharding.OwnsFingerprint(blockRef.ThroughFp) if err != nil { return nil, err } - // Include the block only if it belongs to this bloom gateway shard. - if rs.Includes(s.instanceID) { + if owns { filteredBlockRefs = append(filteredBlockRefs, blockRef) continue } } + return filteredBlockRefs, nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index f23c2222f501c..a6d875d7ffedb 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1253,9 +1253,7 @@ func (t *Loki) addCompactorMiddleware(h http.HandlerFunc) http.Handler { func (t *Loki) initBloomGateway() (services.Service, error) { logger := log.With(util_log.Logger, "component", "bloom-gateway") - instanceAddr := t.bloomGatewayRingManager.RingLifecycler.GetInstanceAddr() - instanceID := t.bloomGatewayRingManager.RingLifecycler.GetInstanceID() - shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.Overrides, instanceAddr, instanceID, logger) + shuffleSharding := bloomgateway.NewShuffleShardingStrategy(t.bloomGatewayRingManager.Ring, t.bloomGatewayRingManager.RingLifecycler, t.Overrides, logger) gateway, err := bloomgateway.New(t.Cfg.BloomGateway, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, shuffleSharding, t.clientMetrics, logger, prometheus.DefaultRegisterer) if err != nil { diff --git a/pkg/util/ring/sharding.go b/pkg/util/ring/sharding.go new file mode 100644 index 0000000000000..cb549ec02bb90 --- /dev/null +++ b/pkg/util/ring/sharding.go @@ -0,0 +1,85 @@ +package ring + +import ( + "github.com/grafana/dskit/ring" + "github.com/prometheus/common/model" +) + +type TenantSharding interface { + GetTenantSubRing(tenantID string) ring.ReadRing + OwnsTenant(tenantID string) bool +} + +type TenantShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + shardSizeForTenant func(tenantID string) int +} + +func NewTenantShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + shardSizeForTenant func(tenantID string) int, +) *TenantShuffleSharding { + return &TenantShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + shardSizeForTenant: shardSizeForTenant, + } +} + +func (s *TenantShuffleSharding) GetTenantSubRing(tenantID string) ring.ReadRing { + shardSize := s.shardSizeForTenant(tenantID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + if shardSize <= 0 { + return s.r + } + + return s.r.ShuffleShard(tenantID, shardSize) +} + +func (s *TenantShuffleSharding) OwnsTenant(tenantID string) bool { + subRing := s.GetTenantSubRing(tenantID) + return subRing.HasInstance(s.ringLifeCycler.GetInstanceID()) +} + +type FingerprintSharding interface { + OwnsFingerprint(fp model.Fingerprint) (bool, error) +} + +// FingerprintShuffleSharding is not thread-safe. +type FingerprintShuffleSharding struct { + r ring.ReadRing + ringLifeCycler *ring.BasicLifecycler + ringOp ring.Operation + + // Buffers for ring.Get() calls. + bufDescs []ring.InstanceDesc + bufHosts, bufZones []string +} + +func NewFingerprintShuffleSharding( + r ring.ReadRing, + ringLifeCycler *ring.BasicLifecycler, + ringOp ring.Operation, +) *FingerprintShuffleSharding { + s := FingerprintShuffleSharding{ + r: r, + ringLifeCycler: ringLifeCycler, + ringOp: ringOp, + } + + s.bufDescs, s.bufHosts, s.bufZones = ring.MakeBuffersForGet() + + return &s +} + +func (s *FingerprintShuffleSharding) OwnsFingerprint(fp uint64) (bool, error) { + rs, err := s.r.Get(uint32(fp), s.ringOp, s.bufDescs, s.bufHosts, s.bufZones) + if err != nil { + return false, err + } + + return rs.Includes(s.ringLifeCycler.GetInstanceAddr()), nil +} From 8172c2238a9aa10af1bb17c841111072615de11a Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 15:54:12 +0100 Subject: [PATCH 09/15] WIP CR feedback --- pkg/bloomcompactor/bloomcompactor.go | 58 +++++++++------------------- pkg/bloomcompactor/config.go | 8 ++-- 2 files changed, 22 insertions(+), 44 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index d93e430d93e52..cfc2dcce760a8 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -29,7 +29,6 @@ import ( "encoding/binary" "fmt" "math" - "math/rand" "os" "path/filepath" "sort" @@ -215,13 +214,7 @@ func (c *Compactor) stopping(_ error) error { } func (c *Compactor) runCompaction(ctx context.Context) error { - var ( - tables []string - // it possible for two periods to use the same storage bucket and path prefix (different indexType or schema version) - // so more than one index storage client may end up listing the same set of buckets - // avoid including the same table twice in the compact tables list. - seen = make(map[string]struct{}) - ) + var tables []string for _, sc := range c.storeClients { // refresh index list cache since previous compaction would have changed the index files in the object store sc.index.RefreshIndexTableNamesCache(ctx) @@ -229,24 +222,13 @@ func (c *Compactor) runCompaction(ctx context.Context) error { if err != nil { return fmt.Errorf("failed to list tables: %w", err) } - - for _, table := range tbls { - if _, ok := seen[table]; ok { - continue - } - - tables = append(tables, table) - seen[table] = struct{}{} - } + tables = append(tables, tbls...) } // process most recent tables first - sortTablesByRange(tables) + tables = filterAndSortTablesByRange(tables, c.cfg.MaxTableAge) - // apply passed in compaction limits - if c.cfg.SkipLatestNTables <= len(tables) { - tables = tables[c.cfg.SkipLatestNTables:] - } + // Filter most-recent TablesToCompact tables if c.cfg.TablesToCompact > 0 && c.cfg.TablesToCompact < len(tables) { tables = tables[:c.cfg.TablesToCompact] } @@ -296,13 +278,6 @@ func (c *Compactor) compactTable(ctx context.Context, tableName string) error { // See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689 func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName string, tenants []string) error { - // When starting multiple compactor replicas nearly at the same time, running in a cluster with - // a large number of tenants, we may end up in a situation where the 1st user is compacted by - // multiple replicas at the same time. Shuffling users helps reduce the likelihood this will happen. - rand.Shuffle(len(tenants), func(i, j int) { - tenants[i], tenants[j] = tenants[j], tenants[i] - }) - // Keep track of tenants owned by this shard, so that we can delete the local files for all other users. errs := multierror.New() ownedTenants := make(map[string]struct{}, len(tenants)) @@ -315,13 +290,7 @@ func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName } // Ensure the tenant ID belongs to our shard. - owned, err := c.sharding.OwnsTenant(tenant) - if err != nil { - c.metrics.compactionRunSkippedTenants.Inc() - level.Warn(logger).Log("msg", "unable to check if tenant is owned by this shard", "err", err) - continue - } - if !owned { + if !c.sharding.OwnsTenant(tenant) { c.metrics.compactionRunSkippedTenants.Inc() level.Debug(logger).Log("msg", "skipping tenant because it is not owned by this shard") continue @@ -641,18 +610,27 @@ func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomship return nil } -// TODO: comes from pkg/compactor/compactor.go -func sortTablesByRange(tables []string) { - tableRanges := make(map[string]model.Interval) +// filterAndSortTablesByRange returns most-recent first sorted list of tables that are within the max age. +func filterAndSortTablesByRange(tables []string, maxAge time.Duration) []string { + tableRanges := make(map[string]model.Interval, len(tables)) + tablesToSort := make([]string, 0, len(tables)) + maxAgeTime := model.Now().Add(-maxAge) for _, table := range tables { + interval := retention.ExtractIntervalFromTableName(table) + if maxAge > 0 && interval.Start.Before(maxAgeTime) { + continue + } + tableRanges[table] = retention.ExtractIntervalFromTableName(table) + tablesToSort = append(tablesToSort, table) } - sort.Slice(tables, func(i, j int) bool { + sort.Slice(tablesToSort, func(i, j int) bool { // less than if start time is after produces a most recent first sort order return tableRanges[tables[i]].Start.After(tableRanges[tables[j]].Start) }) + return tablesToSort } // TODO: comes from pkg/compactor/compactor.go diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 94314f7205d9c..214bf24e706e4 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -23,9 +23,9 @@ type Config struct { RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` CompactionRetries int `yaml:"compaction_retries"` - TablesToCompact int `yaml:"tables_to_compact"` - SkipLatestNTables int `yaml:"skip_latest_n_tables"` - MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + TablesToCompact int `yaml:"tables_to_compact"` + MaxTableAge time.Duration `yaml:"max_table_age"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags for the Bloom-Compactor configuration. @@ -38,7 +38,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") f.IntVar(&cfg.TablesToCompact, "bloom-compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.") - f.IntVar(&cfg.SkipLatestNTables, "bloom-compactor.skip-latest-n-tables", 0, "Do not compact N latest tables.") + f.DurationVar(&cfg.MaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit") f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") } From 7f5bd55ff6a481cfc0a49b85450ffa2d65658277 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 16:13:35 +0100 Subject: [PATCH 10/15] Remove Series struct in favor of Job --- pkg/bloomcompactor/bloomcompactor.go | 71 +++++++--------------------- pkg/bloomcompactor/job.go | 51 +++++++++++++++++--- 2 files changed, 63 insertions(+), 59 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index cfc2dcce760a8..bcf1a5a6cae9b 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -359,7 +359,7 @@ func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName return } - if err := c.runCompact(ctx, c.bloomShipperClient, bt, sc, job); err != nil { + if err := c.runCompact(ctx, job, c.bloomShipperClient, bt, sc); err != nil { c.metrics.compactionRunFailedJobs.Inc() errs.Add(errors.Wrap(err, "runBloomCompact")) return @@ -417,15 +417,6 @@ func (c *Compactor) compactTenantWithRetries(ctx context.Context, sc storeClient ) } -type Series struct { // TODO this can be replaced with Job struct based on Salva's ring work. - tableName, tenant string - labels labels.Labels - fingerPrint model.Fingerprint - chunks []chunk.Chunk - from, through model.Time - indexPath string -} - func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fingerprint) []chunk.Chunk { chunkRefs := make([]chunk.Chunk, 0, len(chksMetas)) for _, chk := range chksMetas { @@ -444,8 +435,8 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } // TODO Revisit this step once v1/bloom lib updated to combine blooms in the same series -func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir string) (bloomshipper.Block, error) { - localDst := createLocalDirName(workingDir, series) +func buildBloomBlock(bloomForChks v1.SeriesWithBloom, job Job, workingDir string) (bloomshipper.Block, error) { + localDst := createLocalDirName(workingDir, job) // write bloom to a local dir builder, err := v1.NewBlockBuilder(v1.NewBlockOptions(), v1.NewDirectoryBlockWriter(localDst)) @@ -482,15 +473,15 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir blocks := bloomshipper.Block{ BlockRef: bloomshipper.BlockRef{ Ref: bloomshipper.Ref{ - TenantID: series.tenant, - TableName: series.tableName, - MinFingerprint: uint64(series.fingerPrint), // TODO will change once we compact multiple blooms into a block - MaxFingerprint: uint64(series.fingerPrint), - StartTimestamp: series.from.Unix(), - EndTimestamp: series.through.Unix(), + TenantID: job.Tenant(), + TableName: job.TableName(), + MinFingerprint: uint64(job.Fingerprint()), // TODO will change once we compact multiple blooms into a block + MaxFingerprint: uint64(job.Fingerprint()), + StartTimestamp: job.From().Unix(), + EndTimestamp: job.Through().Unix(), Checksum: binary.BigEndian.Uint32(checksum), }, - IndexPath: series.indexPath, + IndexPath: job.IndexPath(), }, Data: blockFile, } @@ -498,16 +489,16 @@ func buildBloomBlock(bloomForChks v1.SeriesWithBloom, series Series, workingDir return blocks, nil } -func createLocalDirName(workingDir string, series Series) string { - dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", series.tableName, series.tenant, series.fingerPrint, series.fingerPrint, series.from, series.through) +func createLocalDirName(workingDir string, job Job) string { + dir := fmt.Sprintf("bloomBlock-%s-%s-%s-%s-%s-%s", job.TableName(), job.Tenant(), job.Fingerprint(), job.Fingerprint(), job.From(), job.Through()) return filepath.Join(workingDir, dir) } -func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { +func CompactNewChunks(ctx context.Context, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ - Fingerprint: series.fingerPrint, + Fingerprint: job.Fingerprint(), }, Bloom: &v1.Bloom{ ScalableBloomFilter: *filter.NewDefaultScalableBloomFilter(fpRate), @@ -515,10 +506,10 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, } // Tokenize data into n-grams - bt.PopulateSeriesWithBloom(&bloomForChks, series.chunks) + bt.PopulateSeriesWithBloom(&bloomForChks, chunks) // Build and upload bloomBlock to storage - blocks, err := buildBloomBlock(bloomForChks, series, dst) + blocks, err := buildBloomBlock(bloomForChks, job, dst) if err != nil { level.Info(util_log.Logger).Log("building bloomBlocks", err) return @@ -548,7 +539,7 @@ func CompactNewChunks(ctx context.Context, series Series, bt *v1.BloomTokenizer, return nil } -func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient, job Job) error { +func (c *Compactor) runCompact(ctx context.Context, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error { // TODO call bloomShipperClient.GetMetas to get existing meta.json var metas []bloomshipper.Meta @@ -562,33 +553,7 @@ func (c *Compactor) runCompact(ctx context.Context, bloomShipperClient bloomship return err } - // effectively get min and max of timestamps of the list of chunks in a series - // There must be a better way to get this, ordering chunkRefs by timestamp doesn't fully solve it - // chunk files name have this info in ObjectStore, but it's not really exposed - minFrom := model.Latest - maxThrough := model.Earliest - - for _, c := range chks { - if minFrom > c.From { - minFrom = c.From - } - if maxThrough < c.From { - maxThrough = c.Through - } - } - - series := Series{ - tableName: job.tableName, - tenant: job.Tenant(), - labels: job.Labels(), - fingerPrint: job.Fingerprint(), - chunks: chks, - from: minFrom, - through: maxThrough, - indexPath: job.IndexPath(), - } - - err = CompactNewChunks(ctx, series, bt, bloomShipperClient, c.cfg.WorkingDirectory) + err = CompactNewChunks(ctx, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) if err != nil { return err } diff --git a/pkg/bloomcompactor/job.go b/pkg/bloomcompactor/job.go index 95d716b69470a..5257be97095d8 100644 --- a/pkg/bloomcompactor/job.go +++ b/pkg/bloomcompactor/job.go @@ -12,6 +12,9 @@ type Job struct { seriesLbs labels.Labels seriesFP model.Fingerprint chunks []index.ChunkMeta + + // We compute them lazily. + from, through *model.Time } // NewJob returns a new compaction Job. @@ -33,26 +36,62 @@ func NewJob( } } -func (j Job) String() string { +func (j *Job) String() string { return j.tableName + "_" + j.tenantID + "_" + j.seriesFP.String() } -func (j Job) Tenant() string { +func (j *Job) TableName() string { + return j.tableName +} + +func (j *Job) Tenant() string { return j.tenantID } -func (j Job) Fingerprint() model.Fingerprint { +func (j *Job) Fingerprint() model.Fingerprint { return j.seriesFP } -func (j Job) Chunks() []index.ChunkMeta { +func (j *Job) Chunks() []index.ChunkMeta { return j.chunks } -func (j Job) Labels() labels.Labels { +func (j *Job) Labels() labels.Labels { return j.seriesLbs } -func (j Job) IndexPath() string { +func (j *Job) IndexPath() string { return j.indexPath } + +func (j *Job) From() model.Time { + if j.from == nil { + j.computeFromThrough() + } + return *j.from +} + +func (j *Job) Through() model.Time { + if j.through == nil { + j.computeFromThrough() + } + return *j.through +} + +func (j *Job) computeFromThrough() { + minFrom := model.Latest + maxThrough := model.Earliest + + for _, chunk := range j.chunks { + from, through := chunk.Bounds() + if minFrom > from { + minFrom = from + } + if maxThrough < through { + maxThrough = through + } + } + + j.from = &minFrom + j.through = &maxThrough +} From 930d21e087cf112a43125bf3c518401e130f6e79 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 16:15:22 +0100 Subject: [PATCH 11/15] Fix tests and lint issues --- pkg/bloomcompactor/sharding.go | 1 + pkg/bloomcompactor/sharding_test.go | 6 ++---- pkg/bloomgateway/sharding.go | 1 + 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/bloomcompactor/sharding.go b/pkg/bloomcompactor/sharding.go index 10a023827167a..093c0c3ac9a31 100644 --- a/pkg/bloomcompactor/sharding.go +++ b/pkg/bloomcompactor/sharding.go @@ -2,6 +2,7 @@ package bloomcompactor import ( "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go index 22eac1641e7b1..f3357a70ac256 100644 --- a/pkg/bloomcompactor/sharding_test.go +++ b/pkg/bloomcompactor/sharding_test.go @@ -70,13 +70,11 @@ func TestShuffleSharding(t *testing.T) { var ownedTenants, ownedJobs int for i := 0; i < tenants; i++ { tenant := fmt.Sprintf("tenant-%d", i) - ownsTenant, err := shard.OwnsTenant(tenant) - require.NoError(t, err) + ownsTenant := shard.OwnsTenant(tenant) var tenantOwnedByOther int for _, other := range otherShards { - otherOwns, err := other.OwnsTenant(tenant) - require.NoError(t, err) + otherOwns := other.OwnsTenant(tenant) if otherOwns { tenantOwnedByOther++ } diff --git a/pkg/bloomgateway/sharding.go b/pkg/bloomgateway/sharding.go index 598a2046d1e01..34d3c63c43b14 100644 --- a/pkg/bloomgateway/sharding.go +++ b/pkg/bloomgateway/sharding.go @@ -5,6 +5,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/ring" + util_ring "github.com/grafana/loki/pkg/util/ring" ) From 8b1b3cc9f6af355d1e5378b1520d79d844e5991e Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 8 Nov 2023 16:51:33 +0100 Subject: [PATCH 12/15] Update docs --- docs/sources/configure/_index.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index dbdaad5490f11..a136559efc324 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2556,9 +2556,10 @@ ring: # CLI flag: -bloom-compactor.tables-to-compact [tables_to_compact: | default = 0] -# Do not compact N latest tables. -# CLI flag: -bloom-compactor.skip-latest-n-tables -[skip_latest_n_tables: | default = 0] +# Do not compact tables older than the the configured time. Default to 7 days. +# 0s means no limit +# CLI flag: -bloom-compactor.max-table-age +[max_table_age: | default = 168h] # Maximum number of tables to compact in parallel. While increasing this value, # please make sure compactor has enough disk space allocated to be able to store From 8556a72332ae330c1a9208fd88901c49710cd3d8 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 10 Nov 2023 15:51:09 +0100 Subject: [PATCH 13/15] CR feedback --- pkg/bloomcompactor/bloomcompactor.go | 141 ++++++++++++++++----------- pkg/bloomcompactor/config.go | 4 +- pkg/bloomcompactor/metrics.go | 8 +- pkg/bloomcompactor/sharding_test.go | 8 ++ pkg/validation/limits.go | 18 +++- 5 files changed, 113 insertions(+), 66 deletions(-) diff --git a/pkg/bloomcompactor/bloomcompactor.go b/pkg/bloomcompactor/bloomcompactor.go index bcf1a5a6cae9b..056f2b91176c8 100644 --- a/pkg/bloomcompactor/bloomcompactor.go +++ b/pkg/bloomcompactor/bloomcompactor.go @@ -60,7 +60,6 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb" tsdbindex "github.com/grafana/loki/pkg/storage/stores/shipper/indexshipper/tsdb/index" "github.com/grafana/loki/pkg/util" - util_log "github.com/grafana/loki/pkg/util/log" ) const ( @@ -74,6 +73,7 @@ type Compactor struct { cfg Config logger log.Logger schemaCfg config.SchemaConfig + limits Limits // temporary workaround until store has implemented read/write shipper interface bloomShipperClient bloomshipper.Client @@ -108,6 +108,7 @@ func New( logger: logger, schemaCfg: schemaConfig, sharding: sharding, + limits: limits, } // Configure BloomClient for meta.json management @@ -226,38 +227,42 @@ func (c *Compactor) runCompaction(ctx context.Context) error { } // process most recent tables first - tables = filterAndSortTablesByRange(tables, c.cfg.MaxTableAge) - - // Filter most-recent TablesToCompact tables - if c.cfg.TablesToCompact > 0 && c.cfg.TablesToCompact < len(tables) { - tables = tables[:c.cfg.TablesToCompact] - } + tablesIntervals := getIntervalsForTables(tables) + sortTablesByRange(tables, tablesIntervals) parallelism := c.cfg.MaxCompactionParallelism if parallelism == 0 { parallelism = len(tables) } + // TODO(salvacorts): We currently parallelize at the table level. We may want to parallelize at the tenant and job level as well. + // To do that, we should create a worker pool with c.cfg.MaxCompactionParallelism number of workers. errs := multierror.New() _ = concurrency.ForEachJob(ctx, len(tables), parallelism, func(ctx context.Context, i int) error { tableName := tables[i] - level.Info(c.logger).Log("msg", "compacting table", "table-name", tableName) - err := c.compactTable(ctx, tableName) + logger := log.With(c.logger, "table", tableName) + level.Info(logger).Log("msg", "compacting table") + err := c.compactTable(ctx, logger, tableName, tablesIntervals[tableName]) if err != nil { errs.Add(err) return nil } - level.Info(c.logger).Log("msg", "finished compacting table", "table-name", tableName) + level.Info(logger).Log("msg", "finished compacting table") return nil }) return errs.Err() } -func (c *Compactor) compactTable(ctx context.Context, tableName string) error { +func (c *Compactor) compactTable(ctx context.Context, logger log.Logger, tableName string, tableInterval model.Interval) error { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return fmt.Errorf("interrupting compaction of table: %w", err) + } + schemaCfg, ok := schemaPeriodForTable(c.schemaCfg, tableName) if !ok { - level.Error(c.logger).Log("msg", "skipping compaction since we can't find schema for table", "table", tableName) + level.Error(logger).Log("msg", "skipping compaction since we can't find schema for table") return nil } @@ -272,48 +277,61 @@ func (c *Compactor) compactTable(ctx context.Context, tableName string) error { } c.metrics.compactionRunDiscoveredTenants.Add(float64(len(tenants))) - level.Info(c.logger).Log("msg", "discovered tenants from bucket", "users", len(tenants)) - return c.compactUsers(ctx, sc, tableName, tenants) + level.Info(logger).Log("msg", "discovered tenants from bucket", "users", len(tenants)) + return c.compactUsers(ctx, logger, sc, tableName, tableInterval, tenants) } // See: https://github.com/grafana/mimir/blob/34852137c332d4050e53128481f4f6417daee91e/pkg/compactor/compactor.go#L566-L689 -func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName string, tenants []string) error { +func (c *Compactor) compactUsers(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tableInterval model.Interval, tenants []string) error { // Keep track of tenants owned by this shard, so that we can delete the local files for all other users. errs := multierror.New() ownedTenants := make(map[string]struct{}, len(tenants)) for _, tenant := range tenants { - logger := log.With(c.logger, "tenant", tenant) + tenantLogger := log.With(logger, "tenant", tenant) // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { return fmt.Errorf("interrupting compaction of tenants: %w", err) } + // Skip this table if it is too new/old for the tenant limits. + now := model.Now() + tableMinAge := c.limits.BloomCompactorMinTableAge(tenant) + tableMaxAge := c.limits.BloomCompactorMaxTableAge(tenant) + if tableMinAge > 0 && tableInterval.End.After(now.Add(-tableMinAge)) { + level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too new ", "table-min-age", tableMinAge, "table-end", tableInterval.End, "now", now) + continue + } + if tableMaxAge > 0 && tableInterval.Start.Before(now.Add(-tableMaxAge)) { + level.Debug(tenantLogger).Log("msg", "skipping tenant because table is too old", "table-max-age", tableMaxAge, "table-start", tableInterval.Start, "now", now) + continue + } + // Ensure the tenant ID belongs to our shard. if !c.sharding.OwnsTenant(tenant) { c.metrics.compactionRunSkippedTenants.Inc() - level.Debug(logger).Log("msg", "skipping tenant because it is not owned by this shard") + level.Debug(tenantLogger).Log("msg", "skipping tenant because it is not owned by this shard") continue } ownedTenants[tenant] = struct{}{} - if err := c.compactTenantWithRetries(ctx, sc, tableName, tenant); err != nil { + if err := c.compactTenantWithRetries(ctx, tenantLogger, sc, tableName, tenant); err != nil { switch { case errors.Is(err, context.Canceled): // We don't want to count shutdowns as failed compactions because we will pick up with the rest of the compaction after the restart. - level.Info(logger).Log("msg", "compaction for tenant was interrupted by a shutdown") + level.Info(tenantLogger).Log("msg", "compaction for tenant was interrupted by a shutdown") return nil default: c.metrics.compactionRunFailedTenants.Inc() - level.Error(logger).Log("msg", "failed to compact tenant", "err", err) + level.Error(tenantLogger).Log("msg", "failed to compact tenant", "err", err) errs.Add(err) } continue } c.metrics.compactionRunSucceededTenants.Inc() - level.Info(logger).Log("msg", "successfully compacted tenant") + level.Info(tenantLogger).Log("msg", "successfully compacted tenant") } return errs.Err() @@ -321,8 +339,8 @@ func (c *Compactor) compactUsers(ctx context.Context, sc storeClient, tableName // TODO: Delete local files for unowned tenants, if there are any. } -func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName string, tenant string) error { - level.Info(c.logger).Log("msg", "starting compaction of tenant", "tenant", tenant) +func (c *Compactor) compactTenant(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { + level.Info(logger).Log("msg", "starting compaction of tenant") // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). if err := ctx.Err(); err != nil { @@ -345,21 +363,22 @@ func (c *Compactor) compactTenant(ctx context.Context, sc storeClient, tableName 0, math.MaxInt64, // TODO: Replace with MaxLookBackPeriod func(labels labels.Labels, fingerprint model.Fingerprint, chksMetas []tsdbindex.ChunkMeta) { job := NewJob(tenant, tableName, idx.Path(), fingerprint, labels, chksMetas) + jobLogger := log.With(logger, "job", job.String()) ownsJob, err := c.sharding.OwnsJob(job) if err != nil { - c.metrics.compactionRunSkippedJobs.Inc() - level.Error(c.logger).Log("msg", "failed to check if compactor owns job", "job", job, "err", err) + c.metrics.compactionRunUnownedJobs.Inc() + level.Error(jobLogger).Log("msg", "failed to check if compactor owns job", "err", err) errs.Add(err) return } if !ownsJob { - c.metrics.compactionRunSkippedJobs.Inc() - level.Debug(c.logger).Log("msg", "skipping job because it is not owned by this shard", "job", job) + c.metrics.compactionRunUnownedJobs.Inc() + level.Debug(jobLogger).Log("msg", "skipping job because it is not owned by this shard") return } - if err := c.runCompact(ctx, job, c.bloomShipperClient, bt, sc); err != nil { + if err := c.runCompact(ctx, jobLogger, job, c.bloomShipperClient, bt, sc); err != nil { c.metrics.compactionRunFailedJobs.Inc() errs.Add(errors.Wrap(err, "runBloomCompact")) return @@ -405,14 +424,14 @@ func runWithRetries( return lastErr } -func (c *Compactor) compactTenantWithRetries(ctx context.Context, sc storeClient, tableName string, tenant string) error { +func (c *Compactor) compactTenantWithRetries(ctx context.Context, logger log.Logger, sc storeClient, tableName string, tenant string) error { return runWithRetries( ctx, c.cfg.RetryMinBackoff, c.cfg.RetryMaxBackoff, c.cfg.CompactionRetries, func(ctx context.Context) error { - return c.compactTenant(ctx, sc, tableName, tenant) + return c.compactTenant(ctx, logger, sc, tableName, tenant) }, ) } @@ -435,25 +454,30 @@ func makeChunkRefs(chksMetas []tsdbindex.ChunkMeta, tenant string, fp model.Fing } // TODO Revisit this step once v1/bloom lib updated to combine blooms in the same series -func buildBloomBlock(bloomForChks v1.SeriesWithBloom, job Job, workingDir string) (bloomshipper.Block, error) { +func buildBloomBlock(ctx context.Context, logger log.Logger, bloomForChks v1.SeriesWithBloom, job Job, workingDir string) (bloomshipper.Block, error) { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return bloomshipper.Block{}, err + } + localDst := createLocalDirName(workingDir, job) // write bloom to a local dir builder, err := v1.NewBlockBuilder(v1.NewBlockOptions(), v1.NewDirectoryBlockWriter(localDst)) if err != nil { - level.Info(util_log.Logger).Log("creating builder", err) + level.Error(logger).Log("creating builder", err) return bloomshipper.Block{}, err } err = builder.BuildFrom(v1.NewSliceIter([]v1.SeriesWithBloom{bloomForChks})) if err != nil { - level.Info(util_log.Logger).Log("writing bloom", err) + level.Error(logger).Log("writing bloom", err) return bloomshipper.Block{}, err } blockFile, err := os.Open(filepath.Join(localDst, bloomFileName)) if err != nil { - level.Info(util_log.Logger).Log("reading bloomBlock", err) + level.Error(logger).Log("reading bloomBlock", err) } // read the checksum @@ -494,7 +518,12 @@ func createLocalDirName(workingDir string, job Job) string { return filepath.Join(workingDir, dir) } -func CompactNewChunks(ctx context.Context, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { +func CompactNewChunks(ctx context.Context, logger log.Logger, job Job, chunks []chunk.Chunk, bt *v1.BloomTokenizer, bloomShipperClient bloomshipper.Client, dst string) (err error) { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return err + } + // Create a bloom for this series bloomForChks := v1.SeriesWithBloom{ Series: &v1.Series{ @@ -509,15 +538,15 @@ func CompactNewChunks(ctx context.Context, job Job, chunks []chunk.Chunk, bt *v1 bt.PopulateSeriesWithBloom(&bloomForChks, chunks) // Build and upload bloomBlock to storage - blocks, err := buildBloomBlock(bloomForChks, job, dst) + blocks, err := buildBloomBlock(ctx, logger, bloomForChks, job, dst) if err != nil { - level.Info(util_log.Logger).Log("building bloomBlocks", err) + level.Error(logger).Log("building bloomBlocks", err) return } storedBlocks, err := bloomShipperClient.PutBlocks(ctx, []bloomshipper.Block{blocks}) if err != nil { - level.Info(util_log.Logger).Log("putting blocks to storage", err) + level.Error(logger).Log("putting blocks to storage", err) return } @@ -532,14 +561,19 @@ func CompactNewChunks(ctx context.Context, job Job, chunks []chunk.Chunk, bt *v1 // TODO move this to an outer layer, otherwise creates a meta per block err = bloomShipperClient.PutMeta(ctx, meta) if err != nil { - level.Info(util_log.Logger).Log("putting meta.json to storage", err) + level.Error(logger).Log("putting meta.json to storage", err) return } return nil } -func (c *Compactor) runCompact(ctx context.Context, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error { +func (c *Compactor) runCompact(ctx context.Context, logger log.Logger, job Job, bloomShipperClient bloomshipper.Client, bt *v1.BloomTokenizer, storeClient storeClient) error { + // Ensure the context has not been canceled (ie. compactor shutdown has been triggered). + if err := ctx.Err(); err != nil { + return err + } + // TODO call bloomShipperClient.GetMetas to get existing meta.json var metas []bloomshipper.Meta @@ -553,7 +587,7 @@ func (c *Compactor) runCompact(ctx context.Context, job Job, bloomShipperClient return err } - err = CompactNewChunks(ctx, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) + err = CompactNewChunks(ctx, logger, job, chks, bt, bloomShipperClient, c.cfg.WorkingDirectory) if err != nil { return err } @@ -575,27 +609,20 @@ func (c *Compactor) runCompact(ctx context.Context, job Job, bloomShipperClient return nil } -// filterAndSortTablesByRange returns most-recent first sorted list of tables that are within the max age. -func filterAndSortTablesByRange(tables []string, maxAge time.Duration) []string { - tableRanges := make(map[string]model.Interval, len(tables)) - tablesToSort := make([]string, 0, len(tables)) - maxAgeTime := model.Now().Add(-maxAge) +func getIntervalsForTables(tables []string) map[string]model.Interval { + tablesIntervals := make(map[string]model.Interval, len(tables)) for _, table := range tables { - interval := retention.ExtractIntervalFromTableName(table) - if maxAge > 0 && interval.Start.Before(maxAgeTime) { - continue - } - - tableRanges[table] = retention.ExtractIntervalFromTableName(table) - tablesToSort = append(tablesToSort, table) + tablesIntervals[table] = retention.ExtractIntervalFromTableName(table) } - sort.Slice(tablesToSort, func(i, j int) bool { + return tablesIntervals +} + +func sortTablesByRange(tables []string, intervals map[string]model.Interval) { + sort.Slice(tables, func(i, j int) bool { // less than if start time is after produces a most recent first sort order - return tableRanges[tables[i]].Start.After(tableRanges[tables[j]].Start) + return intervals[tables[i]].Start.After(intervals[tables[j]].Start) }) - - return tablesToSort } // TODO: comes from pkg/compactor/compactor.go diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index 214bf24e706e4..b84ac4fcb0846 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -23,7 +23,6 @@ type Config struct { RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` CompactionRetries int `yaml:"compaction_retries"` - TablesToCompact int `yaml:"tables_to_compact"` MaxTableAge time.Duration `yaml:"max_table_age"` MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } @@ -37,7 +36,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.") f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") - f.IntVar(&cfg.TablesToCompact, "bloom-compactor.tables-to-compact", 0, "Number of tables that compactor will try to compact. Newer tables are chosen when this is less than the number of tables available.") f.DurationVar(&cfg.MaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit") f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") } @@ -45,4 +43,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { type Limits interface { downloads.Limits BloomCompactorShardSize(tenantID string) int + BloomCompactorMaxTableAge(tenantID string) time.Duration + BloomCompactorMinTableAge(tenantID string) time.Duration } diff --git a/pkg/bloomcompactor/metrics.go b/pkg/bloomcompactor/metrics.go index 73c4d3865541e..0d8ed23c54ed1 100644 --- a/pkg/bloomcompactor/metrics.go +++ b/pkg/bloomcompactor/metrics.go @@ -18,7 +18,7 @@ type metrics struct { compactionRunSkippedTenants prometheus.Counter compactionRunSucceededTenants prometheus.Counter compactionRunFailedTenants prometheus.Counter - compactionRunSkippedJobs prometheus.Counter + compactionRunUnownedJobs prometheus.Counter compactionRunSucceededJobs prometheus.Counter compactionRunFailedJobs prometheus.Counter compactionRunInterval prometheus.Gauge @@ -69,11 +69,11 @@ func newMetrics(r prometheus.Registerer) *metrics { Name: "tenants_failed", Help: "Number of tenants failed processing during the current compaction run", }), - compactionRunSkippedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ + compactionRunUnownedJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, Subsystem: metricsSubsystem, - Name: "jobs_skipped", - Help: "Number of jobs skipped during the current compaction run", + Name: "jobs_unowned", + Help: "Number of unowned jobs skipped during the current compaction run", }), compactionRunSucceededJobs: promauto.With(r).NewCounter(prometheus.CounterOpts{ Namespace: metricsNamespace, diff --git a/pkg/bloomcompactor/sharding_test.go b/pkg/bloomcompactor/sharding_test.go index f3357a70ac256..d99f883dd3bbb 100644 --- a/pkg/bloomcompactor/sharding_test.go +++ b/pkg/bloomcompactor/sharding_test.go @@ -135,3 +135,11 @@ type mockLimits struct { func (m mockLimits) BloomCompactorShardSize(_ string) int { return m.bloomCompactorShardSize } + +func (m mockLimits) BloomCompactorMaxTableAge(_ string) time.Duration { + return 0 +} + +func (m mockLimits) BloomCompactorMinTableAge(_ string) time.Duration { + return 0 +} diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 4447489fe75f1..afd14ea200e30 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -181,9 +181,11 @@ type Limits struct { RequiredLabels []string `yaml:"required_labels,omitempty" json:"required_labels,omitempty" doc:"description=Define a list of required selector labels."` RequiredNumberLabels int `yaml:"minimum_labels_number,omitempty" json:"minimum_labels_number,omitempty" doc:"description=Minimum number of label matchers a query should contain."` - IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` - BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` - BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + IndexGatewayShardSize int `yaml:"index_gateway_shard_size" json:"index_gateway_shard_size"` + BloomGatewayShardSize int `yaml:"bloom_gateway_shard_size" json:"bloom_gateway_shard_size"` + BloomCompactorShardSize int `yaml:"bloom_compactor_shard_size" json:"bloom_compactor_shard_size"` + BloomCompactorMaxTableAge time.Duration `yaml:"bloom_compactor_max_table_age" json:"bloom_compactor_max_table_age"` + BloomCompactorMinTableAge time.Duration `yaml:"bloom_compactor_min_table_age" json:"bloom_compactor_min_table_age"` AllowStructuredMetadata bool `yaml:"allow_structured_metadata,omitempty" json:"allow_structured_metadata,omitempty" doc:"description=Allow user to send structured metadata in push payload."` MaxStructuredMetadataSize flagext.ByteSize `yaml:"max_structured_metadata_size" json:"max_structured_metadata_size" doc:"description=Maximum size accepted for structured metadata per log line."` @@ -299,6 +301,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.IntVar(&l.IndexGatewayShardSize, "index-gateway.shard-size", 0, "The shard size defines how many index gateways should be used by a tenant for querying. If the global shard factor is 0, the global shard factor is set to the deprecated -replication-factor for backwards compatibility reasons.") f.IntVar(&l.BloomGatewayShardSize, "bloom-gateway.shard-size", 1, "The shard size defines how many bloom gateways should be used by a tenant for querying.") f.IntVar(&l.BloomCompactorShardSize, "bloom-compactor.shard-size", 1, "The shard size defines how many bloom compactors should be used by a tenant when computing blooms. If it's set to 0, shuffle sharding is disabled.") + f.DurationVar(&l.BloomCompactorMaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "The maximum age of a table before it is compacted. Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit.") + f.DurationVar(&l.BloomCompactorMinTableAge, "bloom-compactor.min-table-age", 1*time.Hour, "The minimum age of a table before it is compacted. Do not compact tables newer than the the configured time. Default to 1 hour. 0s means no limit. This is useful to avoid compacting tables that will be updated with out-of-order writes.") l.ShardStreams = &shardstreams.Config{} l.ShardStreams.RegisterFlagsWithPrefix("shard-streams", f) @@ -793,6 +797,14 @@ func (o *Overrides) BloomCompactorShardSize(userID string) int { return o.getOverridesForUser(userID).BloomCompactorShardSize } +func (o *Overrides) BloomCompactorMaxTableAge(userID string) time.Duration { + return o.getOverridesForUser(userID).BloomCompactorMaxTableAge +} + +func (o *Overrides) BloomCompactorMinTableAge(userID string) time.Duration { + return o.getOverridesForUser(userID).BloomCompactorMinTableAge +} + func (o *Overrides) AllowStructuredMetadata(userID string) bool { return o.getOverridesForUser(userID).AllowStructuredMetadata } From 6b7d7a3aa2a47090058049bdac98a8b4a898b6df Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 10 Nov 2023 16:00:04 +0100 Subject: [PATCH 14/15] Fixes after merge --- pkg/bloomcompactor/bloomcompactor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/bloomcompactor/bloomcompactor_test.go b/pkg/bloomcompactor/bloomcompactor_test.go index a3947439a5fde..65c6779750320 100644 --- a/pkg/bloomcompactor/bloomcompactor_test.go +++ b/pkg/bloomcompactor/bloomcompactor_test.go @@ -36,7 +36,7 @@ const ( func TestCompactor_RunCompaction(t *testing.T) { servercfg := &server.Config{} require.Nil(t, servercfg.LogLevel.Set("debug")) - util_log.InitLogger(servercfg, nil, true, false) + util_log.InitLogger(servercfg, nil, false) tempDir := t.TempDir() indexDir := filepath.Join(tempDir, "index") From 34bc1ff65a63ddab806f06073285691dee949de1 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 10 Nov 2023 16:07:41 +0100 Subject: [PATCH 15/15] More fixes --- docs/sources/configure/_index.md | 22 ++++++++++++---------- pkg/bloomcompactor/config.go | 4 +--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 0d2e91375ca7d..dcf0f4d502cdd 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2547,16 +2547,6 @@ ring: # CLI flag: -bloom-compactor.compaction-retries [compaction_retries: | default = 3] -# Number of tables that compactor will try to compact. Newer tables are chosen -# when this is less than the number of tables available. -# CLI flag: -bloom-compactor.tables-to-compact -[tables_to_compact: | default = 0] - -# Do not compact tables older than the the configured time. Default to 7 days. -# 0s means no limit -# CLI flag: -bloom-compactor.max-table-age -[max_table_age: | default = 168h] - # Maximum number of tables to compact in parallel. While increasing this value, # please make sure compactor has enough disk space allocated to be able to store # and compact as many tables. @@ -2953,6 +2943,18 @@ shard_streams: # CLI flag: -bloom-compactor.shard-size [bloom_compactor_shard_size: | default = 1] +# The maximum age of a table before it is compacted. Do not compact tables older +# than the the configured time. Default to 7 days. 0s means no limit. +# CLI flag: -bloom-compactor.max-table-age +[bloom_compactor_max_table_age: | default = 168h] + +# The minimum age of a table before it is compacted. Do not compact tables newer +# than the the configured time. Default to 1 hour. 0s means no limit. This is +# useful to avoid compacting tables that will be updated with out-of-order +# writes. +# CLI flag: -bloom-compactor.min-table-age +[bloom_compactor_min_table_age: | default = 1h] + # Allow user to send structured metadata in push payload. # CLI flag: -validation.allow-structured-metadata [allow_structured_metadata: | default = false] diff --git a/pkg/bloomcompactor/config.go b/pkg/bloomcompactor/config.go index b84ac4fcb0846..b87aa4e894918 100644 --- a/pkg/bloomcompactor/config.go +++ b/pkg/bloomcompactor/config.go @@ -23,8 +23,7 @@ type Config struct { RetryMaxBackoff time.Duration `yaml:"compaction_retries_max_backoff"` CompactionRetries int `yaml:"compaction_retries"` - MaxTableAge time.Duration `yaml:"max_table_age"` - MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` + MaxCompactionParallelism int `yaml:"max_compaction_parallelism"` } // RegisterFlags registers flags for the Bloom-Compactor configuration. @@ -36,7 +35,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.RetryMinBackoff, "bloom-compactor.compaction-retries-min-backoff", 10*time.Second, "Minimum backoff time between retries.") f.DurationVar(&cfg.RetryMaxBackoff, "bloom-compactor.compaction-retries-max-backoff", time.Minute, "Maximum backoff time between retries.") f.IntVar(&cfg.CompactionRetries, "bloom-compactor.compaction-retries", 3, "Number of retries to perform when compaction fails.") - f.DurationVar(&cfg.MaxTableAge, "bloom-compactor.max-table-age", 7*24*time.Hour, "Do not compact tables older than the the configured time. Default to 7 days. 0s means no limit") f.IntVar(&cfg.MaxCompactionParallelism, "bloom-compactor.max-compaction-parallelism", 1, "Maximum number of tables to compact in parallel. While increasing this value, please make sure compactor has enough disk space allocated to be able to store and compact as many tables.") }