Skip to content

Commit

Permalink
chore(block-scheduler): planner improvements to limit jobs and consid…
Browse files Browse the repository at this point in the history
…er retention when planning (#15432)
  • Loading branch information
ashwanthgoli authored Dec 17, 2024
1 parent e6d82b9 commit fe50c72
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 30 deletions.
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,10 @@ block_scheduler:
# CLI flag: -block-scheduler.target-record-count
[target_record_count: <int> | default = 1000]

# Maximum number of jobs that the planner can return.
# CLI flag: -block-scheduler.max-jobs-planned-per-interval
[max_jobs_planned_per_interval: <int> | default = 100]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
45 changes: 39 additions & 6 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)
Expand Down Expand Up @@ -35,17 +37,42 @@ func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata {
}
}

type jobQueueMetrics struct {
pending prometheus.Gauge
inProgress prometheus.Gauge
completed *prometheus.CounterVec
}

func newJobQueueMetrics(r prometheus.Registerer) *jobQueueMetrics {
return &jobQueueMetrics{
pending: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_block_scheduler_pending_jobs",
Help: "Number of jobs in the block scheduler queue",
}),
inProgress: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_block_scheduler_in_progress_jobs",
Help: "Number of jobs currently being processed",
}),
completed: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_block_scheduler_completed_jobs_total",
Help: "Total number of jobs completed by the block scheduler",
}, []string{"status"}),
}
}

// JobQueue manages the queue of pending jobs and tracks their state.
type JobQueue struct {
logger log.Logger
pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority
inProgress map[string]*JobWithMetadata // Jobs currently being processed
completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs
statusMap map[string]types.JobStatus // Maps job ID to its current status
metrics *jobQueueMetrics
mu sync.RWMutex
}

func NewJobQueueWithLogger(logger log.Logger) *JobQueue {
// NewJobQueue creates a new job queue instance
func NewJobQueue(logger log.Logger, reg prometheus.Registerer) *JobQueue {
return &JobQueue{
logger: logger,
pending: NewPriorityQueue(
Expand All @@ -57,14 +84,10 @@ func NewJobQueueWithLogger(logger log.Logger) *JobQueue {
inProgress: make(map[string]*JobWithMetadata),
completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity),
statusMap: make(map[string]types.JobStatus),
metrics: newJobQueueMetrics(reg),
}
}

// NewJobQueue creates a new job queue instance
func NewJobQueue() *JobQueue {
return NewJobQueueWithLogger(log.NewNopLogger())
}

// Exists checks if a job exists in any state and returns its status
func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
Expand Down Expand Up @@ -111,6 +134,7 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error {
jobMeta := NewJobWithMetadata(job, priority)
q.pending.Push(jobMeta)
q.statusMap[job.ID()] = types.JobStatusPending
q.metrics.pending.Inc()
return nil
}

Expand All @@ -123,6 +147,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) {
if !ok {
return nil, false
}
q.metrics.pending.Dec()

// Update metadata for in-progress state
jobMeta.Status = types.JobStatusInProgress
Expand All @@ -131,6 +156,7 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) {

q.inProgress[jobMeta.ID()] = jobMeta
q.statusMap[jobMeta.ID()] = types.JobStatusInProgress
q.metrics.inProgress.Inc()

return jobMeta.Job, true
}
Expand All @@ -152,6 +178,7 @@ func (q *JobQueue) RemoveInProgress(id string) {
defer q.mu.Unlock()

delete(q.inProgress, id)
q.metrics.inProgress.Dec()
}

// MarkComplete moves a job from in-progress to completed with the given status
Expand All @@ -169,11 +196,13 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
case types.JobStatusInProgress:
// update & remove from in progress
delete(q.inProgress, id)
q.metrics.inProgress.Dec()
case types.JobStatusPending:
_, ok := q.pending.Remove(id)
if !ok {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id)
}
q.metrics.pending.Dec()
default:
level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status)
}
Expand All @@ -187,6 +216,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) {
delete(q.statusMap, removal.ID())
}
q.statusMap[id] = status
q.metrics.completed.WithLabelValues(status.String()).Inc()
}

// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress
Expand All @@ -204,6 +234,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
jobMeta.Status = types.JobStatusInProgress
q.inProgress[jobID] = jobMeta
q.statusMap[jobID] = types.JobStatusInProgress
q.metrics.inProgress.Inc()
}

jobMeta, ok := q.existsLockLess(jobID)
Expand All @@ -221,6 +252,8 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID)
}
jobMeta.Status = types.JobStatusInProgress
q.metrics.pending.Dec()
q.metrics.inProgress.Inc()
case types.JobStatusInProgress:
case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired:
// Job already completed, re-enqueue a new one
Expand Down
15 changes: 8 additions & 7 deletions pkg/blockbuilder/scheduler/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

func TestJobQueue_SyncJob(t *testing.T) {
t.Run("non-existent to in-progress", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
jobID := job.ID()

Expand All @@ -28,7 +29,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("pending to in-progress", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -51,7 +52,7 @@ func TestJobQueue_SyncJob(t *testing.T) {
})

t.Run("already in-progress", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// First sync to put in in-progress
Expand All @@ -72,7 +73,7 @@ func TestJobQueue_SyncJob(t *testing.T) {

func TestJobQueue_MarkComplete(t *testing.T) {
t.Run("in-progress to complete", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with in-progress job
Expand Down Expand Up @@ -102,7 +103,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("pending to complete", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})

// Start with pending job
Expand All @@ -129,7 +130,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("non-existent job", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand All @@ -138,7 +139,7 @@ func TestJobQueue_MarkComplete(t *testing.T) {
})

t.Run("already completed job", func(t *testing.T) {
q := NewJobQueue()
q := NewJobQueue(log.NewNopLogger(), nil)
logger := &testLogger{t: t}
q.logger = logger

Expand Down
19 changes: 13 additions & 6 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ var (
)

type Config struct {
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
ConsumerGroup string `yaml:"consumer_group"`
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
TargetRecordCount int64 `yaml:"target_record_count"`
MaxJobsPlannedPerInterval int `yaml:"max_jobs_planned_per_interval"`
}

func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
Expand All @@ -53,6 +54,12 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
RecordCountStrategy,
),
)
f.IntVar(
&cfg.MaxJobsPlannedPerInterval,
prefix+"max-jobs-planned-per-interval",
100,
"Maximum number of jobs that the planner can return.",
)
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand Down Expand Up @@ -143,7 +150,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

s.publishLagMetrics(lag)

jobs, err := s.planner.Plan(ctx)
jobs, err := s.planner.Plan(ctx, s.cfg.MaxJobsPlannedPerInterval)
if err != nil {
level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
}

func newTestEnv(builderID string) (*testEnv, error) {
queue := NewJobQueue()
queue := NewJobQueue(log.NewNopLogger(), nil)
mockOffsetMgr := &mockOffsetManager{
topic: "test-topic",
consumerGroup: "test-group",
Expand Down
21 changes: 14 additions & 7 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OffsetReader interface {

type Planner interface {
Name() string
Plan(ctx context.Context) ([]*JobWithMetadata, error)
Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error)
}

const (
Expand Down Expand Up @@ -51,7 +51,7 @@ func (p *RecordCountPlanner) Name() string {
return RecordCountStrategy
}

func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) {
func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) {
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
Expand All @@ -60,20 +60,26 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro

jobs := make([]*JobWithMetadata, 0, len(offsets))
for _, partitionOffset := range offsets {
// kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
startOffset := partitionOffset.Commit.At + 1
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
// no additional validation is needed here
// 2. committed offset could be behind start offset if we are falling behind retention period.
startOffset := max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset)
endOffset := partitionOffset.End.Offset

// Skip if there's no lag
if startOffset >= endOffset {
continue
}

var jobCount int
currentStart := startOffset
// Create jobs of size targetRecordCount until we reach endOffset
for currentStart := startOffset; currentStart < endOffset; {
currentEnd := min(currentStart+p.targetRecordCount, endOffset)
for currentStart < endOffset {
if maxJobsPerPartition > 0 && jobCount >= maxJobsPerPartition {
break
}

currentEnd := min(currentStart+p.targetRecordCount, endOffset)
job := NewJobWithMetadata(
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Expand All @@ -84,6 +90,7 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro
jobs = append(jobs, job)

currentStart = currentEnd
jobCount++
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
}
require.NoError(t, cfg.Validate())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())

jobs, err := planner.Plan(context.Background())
jobs, err := planner.Plan(context.Background(), 0)
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1879,7 +1879,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {

s, err := blockscheduler.NewScheduler(
t.Cfg.BlockScheduler,
blockscheduler.NewJobQueueWithLogger(logger),
blockscheduler.NewJobQueue(logger, prometheus.DefaultRegisterer),
offsetManager,
logger,
prometheus.DefaultRegisterer,
Expand Down

0 comments on commit fe50c72

Please sign in to comment.