From 6ac3247016fc53d39b208a5c39b55b82d01b655c Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Wed, 20 Nov 2024 12:29:47 +0100 Subject: [PATCH] feat(blooms): Add task timining and sizing metrics (#15032) (cherry picked from commit 79cccfd50bddc150b8f1e89f66774a117c91f06c) --- pkg/bloombuild/planner/metrics.go | 9 +++++ pkg/bloombuild/planner/planner.go | 30 +++++++++------ .../planner/strategies/chunksize.go | 38 ++++++++++++++++--- .../planner/strategies/chunksize_test.go | 3 +- pkg/bloombuild/planner/strategies/factory.go | 37 +++++++++++++++--- 5 files changed, 93 insertions(+), 24 deletions(-) diff --git a/pkg/bloombuild/planner/metrics.go b/pkg/bloombuild/planner/metrics.go index 936515ad736f3..d38debd6ca85e 100644 --- a/pkg/bloombuild/planner/metrics.go +++ b/pkg/bloombuild/planner/metrics.go @@ -40,6 +40,7 @@ type Metrics struct { tenantsDiscovered prometheus.Counter tenantTasksPlanned *prometheus.GaugeVec tenantTasksCompleted *prometheus.GaugeVec + tenantTasksTiming *prometheus.HistogramVec // Retention metrics retentionRunning prometheus.Gauge @@ -166,6 +167,14 @@ func NewMetrics( Name: "tenant_tasks_completed", Help: "Number of tasks completed for a tenant during the current build iteration.", }, []string{"tenant", "status"}), + tenantTasksTiming: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenant_tasks_time_seconds", + Help: "Time spent building tasks for a tenant during the current build iteration.", + // 1s --> 1h (steps of 1 minute) + Buckets: prometheus.LinearBuckets(1, 60, 60), + }, []string{"tenant", "status"}), // Retention retentionRunning: promauto.With(r).NewGauge(prometheus.GaugeOpts{ diff --git a/pkg/bloombuild/planner/planner.go b/pkg/bloombuild/planner/planner.go index 9ef59a9b7f855..b89c2001eab2c 100644 --- a/pkg/bloombuild/planner/planner.go +++ b/pkg/bloombuild/planner/planner.go @@ -49,7 +49,8 @@ type Planner struct { tsdbStore common.TSDBStore bloomStore bloomshipper.StoreBase - tasksQueue *queue.Queue + tasksQueue *queue.Queue + planFactory *strategies.Factory metrics *Metrics logger log.Logger @@ -86,14 +87,15 @@ func New( } p := &Planner{ - cfg: cfg, - limits: limits, - schemaCfg: schemaCfg, - tsdbStore: tsdbStore, - bloomStore: bloomStore, - tasksQueue: tasksQueue, - metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric), - logger: logger, + cfg: cfg, + limits: limits, + schemaCfg: schemaCfg, + tsdbStore: tsdbStore, + bloomStore: bloomStore, + tasksQueue: tasksQueue, + planFactory: strategies.NewFactory(limits, strategies.NewMetrics(r), logger), + metrics: NewMetrics(r, tasksQueue.GetConnectedConsumersMetric), + logger: logger, } p.retentionManager = NewRetentionManager( @@ -370,7 +372,7 @@ func (p *Planner) computeTasks( table config.DayTable, tenant string, ) ([]*protos.Task, []bloomshipper.Meta, error) { - strategy, err := strategies.NewStrategy(tenant, p.limits, p.logger) + strategy, err := p.planFactory.GetStrategy(tenant) if err != nil { return nil, nil, fmt.Errorf("error creating strategy: %w", err) } @@ -770,8 +772,10 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer continue } + startTime := time.Now() result, err := p.forwardTaskToBuilder(builder, builderID, task) if err != nil { + p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusFailure).Observe(time.Since(startTime).Seconds()) maxRetries := p.limits.BloomTaskMaxRetries(task.Tenant) if maxRetries > 0 && int(task.timesEnqueued.Load()) >= maxRetries { p.tasksQueue.Release(task.ProtoTask) @@ -811,10 +815,12 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer level.Debug(logger).Log( "msg", "task completed", - "duration", time.Since(task.queueTime).Seconds(), + "timeSinceEnqueued", time.Since(task.queueTime).Seconds(), + "buildTime", time.Since(startTime).Seconds(), "retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry ) p.tasksQueue.Release(task.ProtoTask) + p.metrics.tenantTasksTiming.WithLabelValues(task.Tenant, statusSuccess).Observe(time.Since(startTime).Seconds()) // Send the result back to the task. The channel is buffered, so this should not block. task.resultsChannel <- result @@ -866,7 +872,7 @@ func (p *Planner) forwardTaskToBuilder( case err := <-errCh: return nil, err case <-timeout: - return nil, fmt.Errorf("timeout waiting for response from builder (%s)", builderID) + return nil, fmt.Errorf("timeout (%s) waiting for response from builder (%s)", taskTimeout, builderID) } } diff --git a/pkg/bloombuild/planner/strategies/chunksize.go b/pkg/bloombuild/planner/strategies/chunksize.go index 456183aa62ef5..c2204af72bf84 100644 --- a/pkg/bloombuild/planner/strategies/chunksize.go +++ b/pkg/bloombuild/planner/strategies/chunksize.go @@ -9,6 +9,8 @@ import ( "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -21,22 +23,47 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index" ) +const ( + metricsNamespace = "loki" + metricsSubsystem = "bloomplanner" +) + +type ChunkSizeStrategyMetrics struct { + tenantTaskSize *prometheus.HistogramVec +} + +func NewChunkSizeStrategyMetrics(r prometheus.Registerer) *ChunkSizeStrategyMetrics { + return &ChunkSizeStrategyMetrics{ + tenantTaskSize: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: metricsNamespace, + Subsystem: metricsSubsystem, + Name: "tenant_task_size_bytes", + Help: "Size of tasks generated by the chunk size strategy", + // 1GB --> 512GB + Buckets: prometheus.ExponentialBuckets(1e9, 2, 10), + }, []string{"tenant"}), + } +} + type ChunkSizeStrategyLimits interface { BloomTaskTargetSeriesChunksSizeBytes(tenantID string) uint64 } type ChunkSizeStrategy struct { - limits ChunkSizeStrategyLimits - logger log.Logger + limits ChunkSizeStrategyLimits + metrics *ChunkSizeStrategyMetrics + logger log.Logger } func NewChunkSizeStrategy( limits ChunkSizeStrategyLimits, + metrics *ChunkSizeStrategyMetrics, logger log.Logger, ) (*ChunkSizeStrategy, error) { return &ChunkSizeStrategy{ - limits: limits, - logger: logger, + limits: limits, + metrics: metrics, + logger: logger, }, nil } @@ -82,8 +109,9 @@ func (s *ChunkSizeStrategy) Plan( continue } - bounds := series.Bounds() + s.metrics.tenantTaskSize.WithLabelValues(tenant).Observe(float64(series.Size())) + bounds := series.Bounds() blocks, err := getBlocksMatchingBounds(metas, bounds) if err != nil { return nil, fmt.Errorf("failed to get blocks matching bounds: %w", err) diff --git a/pkg/bloombuild/planner/strategies/chunksize_test.go b/pkg/bloombuild/planner/strategies/chunksize_test.go index 951d033e5c100..c59e13fdfdd1c 100644 --- a/pkg/bloombuild/planner/strategies/chunksize_test.go +++ b/pkg/bloombuild/planner/strategies/chunksize_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/bloombuild/planner/plannertest" @@ -228,7 +229,7 @@ func Test_ChunkSizeStrategy_Plan(t *testing.T) { logger := log.NewNopLogger() //logger := log.NewLogfmtLogger(os.Stdout) - strategy, err := NewChunkSizeStrategy(tc.limits, logger) + strategy, err := NewChunkSizeStrategy(tc.limits, NewChunkSizeStrategyMetrics(prometheus.NewPedanticRegistry()), logger) require.NoError(t, err) actual, err := strategy.Plan(context.Background(), plannertest.TestTable, "fake", tc.tsdbs, tc.originalMetas) diff --git a/pkg/bloombuild/planner/strategies/factory.go b/pkg/bloombuild/planner/strategies/factory.go index f58f91e51708d..d48d4e7f59f87 100644 --- a/pkg/bloombuild/planner/strategies/factory.go +++ b/pkg/bloombuild/planner/strategies/factory.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/loki/v3/pkg/bloombuild/common" "github.com/grafana/loki/v3/pkg/bloombuild/protos" @@ -32,18 +33,42 @@ type PlanningStrategy interface { Plan(ctx context.Context, table config.DayTable, tenant string, tsdbs TSDBSet, metas []bloomshipper.Meta) ([]*protos.Task, error) } -func NewStrategy( - tenantID string, +type Metrics struct { + *ChunkSizeStrategyMetrics +} + +func NewMetrics(reg prometheus.Registerer) *Metrics { + return &Metrics{ + ChunkSizeStrategyMetrics: NewChunkSizeStrategyMetrics(reg), + } +} + +type Factory struct { + limits Limits + logger log.Logger + metrics *Metrics +} + +func NewFactory( limits Limits, + metrics *Metrics, logger log.Logger, -) (PlanningStrategy, error) { - strategy := limits.BloomPlanningStrategy(tenantID) +) *Factory { + return &Factory{ + limits: limits, + logger: logger, + metrics: metrics, + } +} + +func (f *Factory) GetStrategy(tenantID string) (PlanningStrategy, error) { + strategy := f.limits.BloomPlanningStrategy(tenantID) switch strategy { case SplitKeyspaceStrategyName: - return NewSplitKeyspaceStrategy(limits, logger) + return NewSplitKeyspaceStrategy(f.limits, f.logger) case SplitBySeriesChunkSizeStrategyName: - return NewChunkSizeStrategy(limits, logger) + return NewChunkSizeStrategy(f.limits, f.metrics.ChunkSizeStrategyMetrics, f.logger) default: return nil, fmt.Errorf("unknown bloom planning strategy (%s)", strategy) }