diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 9cf3b2a18ed0f..b64879703c960 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -230,6 +230,10 @@ block_scheduler: # CLI flag: -block-scheduler.target-record-count [target_record_count: | default = 1000] + # Maximum number of jobs that the planner can return. + # CLI flag: -block-scheduler.max-jobs-planned-per-interval + [max_jobs_planned_per_interval: | default = 100] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 1aeb15e8395e5..9b4dd4292c109 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -7,6 +7,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) @@ -35,6 +37,29 @@ func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata { } } +type jobQueueMetrics struct { + pending prometheus.Gauge + inProgress prometheus.Gauge + completed *prometheus.CounterVec +} + +func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics { + return &jobQueueMetrics{ + pending: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_pending_jobs", + Help: "Number of jobs in the block scheduler queue", + }), + inProgress: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_block_scheduler_in_progress_jobs", + Help: "Number of jobs currently being processed", + }), + completed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_block_scheduler_completed_jobs_total", + Help: "Total number of jobs completed by the block scheduler", + }, []string{"status"}), + } +} + // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { logger log.Logger @@ -42,10 +67,12 @@ type JobQueue struct { inProgress map[string]*JobWithMetadata // Jobs currently being processed completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs statusMap map[string]types.JobStatus // Maps job ID to its current status + metrics *jobQueueMetrics mu sync.RWMutex } -func NewJobQueueWithLogger(logger log.Logger) *JobQueue { +// NewJobQueue creates a new job queue instance +func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue { return &JobQueue{ logger: logger, pending: NewPriorityQueue( @@ -57,14 +84,10 @@ func NewJobQueueWithLogger(logger log.Logger) *JobQueue { inProgress: make(map[string]*JobWithMetadata), completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity), statusMap: make(map[string]types.JobStatus), + metrics: newJobQueueMetrics(reg), } } -// NewJobQueue creates a new job queue instance -func NewJobQueue() *JobQueue { - return NewJobQueueWithLogger(log.NewNopLogger()) -} - // Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() @@ -111,6 +134,7 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { jobMeta := NewJobWithMetadata(job, priority) q.pending.Push(jobMeta) q.statusMap[job.ID()] = types.JobStatusPending + q.metrics.pending.Inc() return nil } @@ -123,6 +147,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { if !ok { return nil, false } + q.metrics.pending.Dec() // Update metadata for in-progress state jobMeta.Status = types.JobStatusInProgress @@ -131,6 +156,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { q.inProgress[jobMeta.ID()] = jobMeta q.statusMap[jobMeta.ID()] = types.JobStatusInProgress + q.metrics.inProgress.Inc() return jobMeta.Job, true } @@ -152,6 +178,7 @@ func (q *JobQueue) RemoveInProgress(id string) { defer q.mu.Unlock() delete(q.inProgress, id) + q.metrics.inProgress.Dec() } // MarkComplete moves a job from in-progress to completed with the given status @@ -169,11 +196,13 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { case types.JobStatusInProgress: // update & remove from in progress delete(q.inProgress, id) + q.metrics.inProgress.Dec() case types.JobStatusPending: _, ok := q.pending.Remove(id) if !ok { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id) } + q.metrics.pending.Dec() default: level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status) } @@ -187,6 +216,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { delete(q.statusMap, removal.ID()) } q.statusMap[id] = status + q.metrics.completed.WithLabelValues(status.String()).Inc() } // SyncJob registers a job as in-progress or updates its UpdateTime if already in progress @@ -204,6 +234,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { jobMeta.Status = types.JobStatusInProgress q.inProgress[jobID] = jobMeta q.statusMap[jobID] = types.JobStatusInProgress + q.metrics.inProgress.Inc() } jobMeta, ok := q.existsLockLess(jobID) @@ -221,6 +252,8 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID) } jobMeta.Status = types.JobStatusInProgress + q.metrics.pending.Dec() + q.metrics.inProgress.Inc() case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one diff --git a/pkg/blockbuilder/scheduler/queue_test.go b/pkg/blockbuilder/scheduler/queue_test.go index dfbe07681c62a..cf86ec29f8941 100644 --- a/pkg/blockbuilder/scheduler/queue_test.go +++ b/pkg/blockbuilder/scheduler/queue_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/blockbuilder/types" @@ -11,7 +12,7 @@ import ( func TestJobQueue_SyncJob(t *testing.T) { t.Run("non-existent to in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) jobID := job.ID() @@ -28,7 +29,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("pending to in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -51,7 +52,7 @@ func TestJobQueue_SyncJob(t *testing.T) { }) t.Run("already in-progress", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // First sync to put in in-progress @@ -72,7 +73,7 @@ func TestJobQueue_SyncJob(t *testing.T) { func TestJobQueue_MarkComplete(t *testing.T) { t.Run("in-progress to complete", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with in-progress job @@ -102,7 +103,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("pending to complete", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) // Start with pending job @@ -129,7 +130,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("non-existent job", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger @@ -138,7 +139,7 @@ func TestJobQueue_MarkComplete(t *testing.T) { }) t.Run("already completed job", func(t *testing.T) { - q := NewJobQueue() + q := NewJobQueue(log.NewNopLogger(), nil) logger := &testLogger{t: t} q.logger = logger diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 55aa95459a1d9..8055fef5ddc95 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -24,11 +24,12 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` - Interval time.Duration `yaml:"interval"` - LookbackPeriod time.Duration `yaml:"lookback_period"` - Strategy string `yaml:"strategy"` - TargetRecordCount int64 `yaml:"target_record_count"` + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + LookbackPeriod time.Duration `yaml:"lookback_period"` + Strategy string `yaml:"strategy"` + TargetRecordCount int64 `yaml:"target_record_count"` + MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -53,6 +54,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { RecordCountStrategy, ), ) + f.IntVar( + &cfg.MaxJobsPlannedPerInterval, + prefix+"max-jobs-planned-per-interval", + 100, + "Maximum number of jobs that the planner can return.", + ) } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -143,7 +150,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { s.publishLagMetrics(lag) - jobs, err := s.planner.Plan(ctx) + jobs, err := s.planner.Plan(ctx, s.cfg.MaxJobsPlannedPerInterval) if err != nil { level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 48460f5e39856..1f5654312c144 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { } func newTestEnv(builderID string) (*testEnv, error) { - queue := NewJobQueue() + queue := NewJobQueue(log.NewNopLogger(), nil) mockOffsetMgr := &mockOffsetManager{ topic: "test-topic", consumerGroup: "test-group", diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index e710d14767275..78468bbea97ae 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -19,7 +19,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context) ([]*JobWithMetadata, error) + Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) } const ( @@ -51,7 +51,7 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) { +func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) { offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) @@ -60,9 +60,10 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro jobs := make([]*JobWithMetadata, 0, len(offsets)) for _, partitionOffset := range offsets { - // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. - // no additional validation is needed here - startOffset := partitionOffset.Commit.At + 1 + // 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. + // no additional validation is needed here + // 2. committed offset could be behind start offset if we are falling behind retention period. + startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset) endOffset := partitionOffset.End.Offset // Skip if there's no lag @@ -70,10 +71,15 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro continue } + var jobCount int + currentStart := startOffset // Create jobs of size targetRecordCount until we reach endOffset - for currentStart := startOffset; currentStart < endOffset; { - currentEnd := min(currentStart+p.targetRecordCount, endOffset) + for currentStart < endOffset { + if maxJobsPerPartition > 0 && jobCount >= maxJobsPerPartition { + break + } + currentEnd := min(currentStart+p.targetRecordCount, endOffset) job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, @@ -84,6 +90,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro jobs = append(jobs, job) currentStart = currentEnd + jobCount++ } } diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 7f6dedcb4abe6..bba62df867c8e 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -147,8 +147,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { } require.NoError(t, cfg.Validate()) planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) - - jobs, err := planner.Plan(context.Background()) + jobs, err := planner.Plan(context.Background(), 0) require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs)) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 49a26498b8e31..c92f56dfcff50 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1879,7 +1879,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { s, err := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, - blockscheduler.NewJobQueueWithLogger(logger), + blockscheduler.NewJobQueue(logger, prometheus.DefaultRegisterer), offsetManager, logger, prometheus.DefaultRegisterer,