Skip to content

Commit

Permalink
(Blooms) Add metrics to compactor (#11966)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Feb 15, 2024
1 parent 443720f commit 543aaab
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 66 deletions.
21 changes: 17 additions & 4 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,7 @@ func New(
c.logger,
)

c.metrics.compactionRunInterval.Set(cfg.CompactionInterval.Seconds())
c.Service = services.NewBasicService(c.starting, c.running, c.stopping)

return c, nil
}

Expand All @@ -138,11 +136,17 @@ func (c *Compactor) running(ctx context.Context) error {
case <-ctx.Done():
return ctx.Err()

case <-ticker.C:
case start := <-ticker.C:
c.metrics.compactionsStarted.Inc()
if err := c.runOne(ctx); err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err)
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err, "duration", time.Since(start))
c.metrics.compactionCompleted.WithLabelValues(statusFailure).Inc()
c.metrics.compactionTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
return err
}
level.Info(c.logger).Log("msg", "compaction iteration completed", "duration", time.Since(start))
c.metrics.compactionCompleted.WithLabelValues(statusSuccess).Inc()
c.metrics.compactionTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
}
}
}
Expand Down Expand Up @@ -252,14 +256,17 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
}

for tenants.Next() && tenants.Err() == nil && ctx.Err() == nil {
c.metrics.tenantsDiscovered.Inc()
tenant := tenants.At()
ownershipRange, owns, err := c.ownsTenant(tenant)
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
}
c.metrics.tenantsOwned.Inc()

select {
case ch <- tenantTable{tenant: tenant, table: table, ownershipRange: ownershipRange}:
Expand Down Expand Up @@ -296,7 +303,11 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
return nil
}

start := time.Now()
c.metrics.tenantsStarted.Inc()
if err := c.compactTenantTable(ctx, tt); err != nil {
c.metrics.tenantsCompleted.WithLabelValues(statusFailure).Inc()
c.metrics.tenantsCompletedTime.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
return errors.Wrapf(
err,
"compacting tenant table (%s) for tenant (%s) with ownership (%s)",
Expand All @@ -305,6 +316,8 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
tt.ownershipRange,
)
}
c.metrics.tenantsCompleted.WithLabelValues(statusSuccess).Inc()
c.metrics.tenantsCompletedTime.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
}
}

Expand Down
15 changes: 10 additions & 5 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type SimpleBloomController struct {
metrics *Metrics
limits Limits

// TODO(owen-d): add metrics
logger log.Logger
}

Expand Down Expand Up @@ -269,6 +268,7 @@ func (s *SimpleBloomController) buildGaps(
maxBlockSize = uint64(s.limits.BloomCompactorMaxBlockSize(tenant))
blockOpts = v1.NewBlockOptions(nGramSize, nGramSkip, maxBlockSize)
created []bloomshipper.Meta
totalSeries uint64
)

for _, plan := range work {
Expand All @@ -295,10 +295,15 @@ func (s *SimpleBloomController) buildGaps(
return nil, errors.Wrap(err, "failed to get series and blocks")
}

// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := v1.NewCounterIter[*v1.Series](seriesItr)

gen := NewSimpleBloomGenerator(
tenant,
blockOpts,
seriesItr,
seriesItrWithCounter,
s.chunkLoader,
blocksIter,
s.rwFn,
Expand All @@ -307,9 +312,7 @@ func (s *SimpleBloomController) buildGaps(
)

_, loaded, newBlocks, err := gen.Generate(ctx)

if err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter)
return nil, errors.Wrap(err, "failed to generate bloom")
Expand Down Expand Up @@ -338,7 +341,6 @@ func (s *SimpleBloomController) buildGaps(
}

if err := newBlocks.Err(); err != nil {
// TODO(owen-d): metrics
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
s.closeLoadedBlocks(loaded, blocksIter)
return nil, errors.Wrap(err, "failed to generate bloom")
Expand All @@ -360,9 +362,12 @@ func (s *SimpleBloomController) buildGaps(
return nil, errors.Wrap(err, "failed to write meta")
}
created = append(created, meta)

totalSeries += uint64(seriesItrWithCounter.Count())
}
}

s.metrics.tenantsSeries.Observe(float64(totalSeries))
level.Debug(logger).Log("msg", "finished bloom generation", "blocks", blockCt, "tsdbs", tsdbCt)
return created, nil
}
Expand Down
113 changes: 56 additions & 57 deletions pkg/bloomcompactor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,105 +16,104 @@ const (
)

type Metrics struct {
bloomMetrics *v1.Metrics
chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series
bloomMetrics *v1.Metrics
compactorRunning prometheus.Gauge
chunkSize prometheus.Histogram // uncompressed size of all chunks summed per series

compactionRunsStarted prometheus.Counter
compactionRunsCompleted *prometheus.CounterVec
compactionRunTime *prometheus.HistogramVec
compactionRunDiscoveredTenants prometheus.Counter
compactionRunSkippedTenants prometheus.Counter
compactionRunTenantsCompleted *prometheus.CounterVec
compactionRunTenantsTime *prometheus.HistogramVec
compactionRunJobStarted prometheus.Counter
compactionRunJobCompleted *prometheus.CounterVec
compactionRunJobTime *prometheus.HistogramVec
compactionRunInterval prometheus.Gauge
compactorRunning prometheus.Gauge
compactionsStarted prometheus.Counter
compactionCompleted *prometheus.CounterVec
compactionTime *prometheus.HistogramVec

tenantsDiscovered prometheus.Counter
tenantsOwned prometheus.Counter
tenantsSkipped prometheus.Counter
tenantsStarted prometheus.Counter
tenantsCompleted *prometheus.CounterVec
tenantsCompletedTime *prometheus.HistogramVec
tenantsSeries prometheus.Histogram
}

func NewMetrics(r prometheus.Registerer, bloomMetrics *v1.Metrics) *Metrics {
m := Metrics{
bloomMetrics: bloomMetrics,
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if compactor is currently running on this instance",
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "bloom_chunk_series_size",
Help: "Uncompressed size of chunks in a series",
Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10),
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "chunk_series_size",
Help: "Uncompressed size of chunks in a series",
Buckets: prometheus.ExponentialBucketsRange(1024, 1073741824, 10),
}),
compactionRunsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{

compactionsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_started_total",
Name: "compactions_started",
Help: "Total number of compactions started",
}),
compactionRunsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
compactionCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_completed_total",
Help: "Total number of compactions completed successfully",
Name: "compactions_completed",
Help: "Total number of compactions completed",
}, []string{"status"}),
compactionRunTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
compactionTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "runs_time_seconds",
Name: "compactions_time_seconds",
Help: "Time spent during a compaction cycle.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),
compactionRunDiscoveredTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{

tenantsDiscovered: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_discovered",
Help: "Number of tenants discovered during the current compaction run",
}),
compactionRunSkippedTenants: promauto.With(r).NewCounter(prometheus.CounterOpts{
tenantsOwned: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_skipped",
Help: "Number of tenants skipped during the current compaction run",
Name: "tenants_owned",
Help: "Number of tenants owned by this instance",
}),
compactionRunTenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_completed",
Help: "Number of tenants successfully processed during the current compaction run",
}, []string{"status"}),
compactionRunTenantsTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
tenantsSkipped: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "tenants_time_seconds",
Help: "Time spent processing tenants.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),
compactionRunJobStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "tenants_skipped",
Help: "Number of tenants skipped since they are not owned by this instance",
}),
tenantsStarted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "job_started",
Help: "Number of jobs started processing during the current compaction run",
Name: "tenants_started",
Help: "Number of tenants started to process during the current compaction run",
}),
compactionRunJobCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
tenantsCompleted: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "job_completed",
Help: "Number of jobs successfully processed during the current compaction run",
Name: "tenants_completed",
Help: "Number of tenants successfully processed during the current compaction run",
}, []string{"status"}),
compactionRunJobTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
tenantsCompletedTime: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "job_time_seconds",
Help: "Time spent processing jobs.",
Name: "tenants_time_seconds",
Help: "Time spent processing tenants.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),
compactionRunInterval: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "compaction_interval_seconds",
Help: "The configured interval on which compaction is run in seconds",
}),
compactorRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{
tenantsSeries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if compactor is currently running on this instance",
Name: "tenants_series",
Help: "Number of series processed per tenant in the owned fingerprint-range.",
// Up to 10M series per tenant, way more than what we expect given our max_global_streams_per_user limits
Buckets: prometheus.ExponentialBucketsRange(1, 10000000, 10),
}),
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,29 @@ func NewPeekCloseIter[T any](itr CloseableIterator[T]) *PeekCloseIter[T] {
func (it *PeekCloseIter[T]) Close() error {
return it.close()
}

type CounterIterator[T any] interface {
Iterator[T]
Count() int
}

type CounterIter[T any] struct {
Iterator[T] // the underlying iterator
count int
}

func NewCounterIter[T any](itr Iterator[T]) *CounterIter[T] {
return &CounterIter[T]{Iterator: itr}
}

func (it *CounterIter[T]) Next() bool {
if it.Iterator.Next() {
it.count++
return true
}
return false
}

func (it *CounterIter[T]) Count() int {
return it.count
}
26 changes: 26 additions & 0 deletions pkg/storage/bloom/v1/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,29 @@ func TestPeekingIterator(t *testing.T) {
require.False(t, itr.Next())

}

func TestCounterIter(t *testing.T) {
t.Parallel()

data := []int{1, 2, 3, 4, 5}
itr := NewCounterIter[int](NewSliceIter[int](data))
peekItr := NewPeekingIter[int](itr)

// Consume the outer iter and use peek
for {
if _, ok := peekItr.Peek(); !ok {
break
}
if !peekItr.Next() {
break
}
}
// Both iterators should be exhausted
require.False(t, itr.Next())
require.Nil(t, itr.Err())
require.False(t, peekItr.Next())
require.Nil(t, peekItr.Err())

// Assert that the count is correct and peeking hasn't jeopardized the count
require.Equal(t, len(data), itr.Count())
}

0 comments on commit 543aaab

Please sign in to comment.