diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index d8784bf568d99..ad981e3183d0e 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -250,13 +250,13 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error logger := log.With( i.logger, "worker_id", workerID, - "partition", job.Partition, - "job_min_offset", job.Offsets.Min, - "job_max_offset", job.Offsets.Max, + "partition", job.Partition(), + "job_min_offset", job.Offsets().Min, + "job_max_offset", job.Offsets().Max, ) i.jobsMtx.Lock() - i.inflightJobs[job.ID] = job + i.inflightJobs[job.ID()] = job i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) i.jobsMtx.Unlock() @@ -284,7 +284,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error } i.jobsMtx.Lock() - delete(i.inflightJobs, job.ID) + delete(i.inflightJobs, job.ID()) i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) i.jobsMtx.Unlock() @@ -315,7 +315,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "load records", 1, func(ctx context.Context) error { - lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh) + lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh) return err }, func(ctx context.Context) error { @@ -323,7 +323,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "msg", "finished loading records", "ctx_error", ctx.Err(), "last_offset", lastOffset, - "total_records", lastOffset-job.Offsets.Min, + "total_records", lastOffset-job.Offsets().Min, ) close(inputCh) return nil @@ -488,7 +488,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo } } - if lastOffset <= job.Offsets.Min { + if lastOffset <= job.Offsets().Min { return lastOffset, nil } diff --git a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go index b27d950aa04b0..6845e29dbbb3d 100644 --- a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go +++ b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go @@ -7,7 +7,7 @@ import ( ) func TestPriorityQueue(t *testing.T) { - t.Run("operations", func(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { tests := []struct { name string input []int @@ -33,16 +33,14 @@ func TestPriorityQueue(t *testing.T) { input: []int{3, 1, 2}, wantPops: []int{1, 2, 3}, }, - { - name: "duplicate elements", - input: []int{2, 1, 2, 1}, - wantPops: []int{1, 1, 2, 2}, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pq := NewPriorityQueue[int](func(a, b int) bool { return a < b }) + pq := NewPriorityQueue[int, int]( + func(a, b int) bool { return a < b }, + func(v int) int { return v }, + ) require.Equal(t, 0, pq.Len()) // Push all elements @@ -69,15 +67,73 @@ func TestPriorityQueue(t *testing.T) { } }) + t.Run("key operations", func(t *testing.T) { + type Job struct { + ID string + Priority int + } + + pq := NewPriorityQueue[string, Job]( + func(a, b Job) bool { return a.Priority < b.Priority }, + func(j Job) string { return j.ID }, + ) + + // Test Push with duplicate key + job1 := Job{ID: "job1", Priority: 1} + job1Updated := Job{ID: "job1", Priority: 3} + job2 := Job{ID: "job2", Priority: 2} + + pq.Push(job1) + require.Equal(t, 1, pq.Len()) + + // Push with same key should update + pq.Push(job1Updated) + require.Equal(t, 1, pq.Len()) + + // Verify updated priority + v, ok := pq.Lookup("job1") + require.True(t, ok) + require.Equal(t, job1Updated, v) + + // Test Remove + pq.Push(job2) + v, ok = pq.Remove("job1") + require.True(t, ok) + require.Equal(t, job1Updated, v) + require.Equal(t, 1, pq.Len()) + + // Test UpdatePriority + newJob2 := Job{ID: "job2", Priority: 4} + ok = pq.UpdatePriority("job2", newJob2) + require.True(t, ok) + + v, ok = pq.Lookup("job2") + require.True(t, ok) + require.Equal(t, newJob2, v) + + // Test non-existent key operations + v, ok = pq.Lookup("nonexistent") + require.False(t, ok) + require.Zero(t, v) + + v, ok = pq.Remove("nonexistent") + require.False(t, ok) + require.Zero(t, v) + + ok = pq.UpdatePriority("nonexistent", Job{}) + require.False(t, ok) + }) + t.Run("custom type", func(t *testing.T) { type Job struct { ID string Priority int } - pq := NewPriorityQueue[Job](func(a, b Job) bool { - return a.Priority < b.Priority - }) + pq := NewPriorityQueue[string, Job]( + func(a, b Job) bool { return a.Priority < b.Priority }, + func(j Job) string { return j.ID }, + ) jobs := []Job{ {ID: "high", Priority: 3}, @@ -102,25 +158,28 @@ func TestPriorityQueue(t *testing.T) { }) t.Run("mixed operations", func(t *testing.T) { - pq := NewPriorityQueue[int](func(a, b int) bool { return a < b }) + pq := NewPriorityQueue[int, int]( + func(a, b int) bool { return a < b }, + func(v int) int { return v }, + ) // Push some elements pq.Push(3) pq.Push(1) - require.Equal(t, 2, pq.Len()) + pq.Push(4) - // Pop lowest + // Pop an element v, ok := pq.Pop() require.True(t, ok) require.Equal(t, 1, v) // Push more elements pq.Push(2) - pq.Push(4) + pq.Push(5) - // Verify remaining elements come out in order - want := []int{2, 3, 4} - got := make([]int, 0, 3) + // Pop remaining elements and verify order + want := []int{2, 3, 4, 5} + got := make([]int, 0, len(want)) for range want { v, ok := pq.Pop() require.True(t, ok) @@ -191,3 +250,54 @@ func TestCircularBuffer(t *testing.T) { }) } } + +func TestCircularBufferLookup(t *testing.T) { + t.Run("empty buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + _, ok := cb.Lookup(func(i int) bool { return i == 1 }) + require.False(t, ok) + }) + + t.Run("single element", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + cb.Push(1) + v, ok := cb.Lookup(func(i int) bool { return i == 1 }) + require.True(t, ok) + require.Equal(t, 1, v) + }) + + t.Run("multiple elements", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + for i := 1; i <= 3; i++ { + cb.Push(i) + } + v, ok := cb.Lookup(func(i int) bool { return i == 2 }) + require.True(t, ok) + require.Equal(t, 2, v) + }) + + t.Run("wrapped buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](3) + // Push 5 elements into a buffer of size 3, causing wrap-around + for i := 1; i <= 5; i++ { + cb.Push(i) + } + // Buffer should now contain [4,5,3] with head at index 2 + v, ok := cb.Lookup(func(i int) bool { return i == 4 }) + require.True(t, ok) + require.Equal(t, 4, v) + + // Element that was evicted should not be found + _, ok = cb.Lookup(func(i int) bool { return i == 1 }) + require.False(t, ok) + }) + + t.Run("no match", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + for i := 1; i <= 3; i++ { + cb.Push(i) + } + _, ok := cb.Lookup(func(i int) bool { return i == 99 }) + require.False(t, ok) + }) +} diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 3b488716cabe8..86b2c795f2eb2 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -4,82 +4,142 @@ import ( "container/heap" ) -// PriorityQueue is a generic priority queue. -type PriorityQueue[T any] struct { - h *priorityHeap[T] +// PriorityQueue is a generic priority queue with constant time lookups. +type PriorityQueue[K comparable, V any] struct { + h *priorityHeap[V] + m map[K]*item[V] // Map for constant time lookups + key func(V) K // Function to extract key from value +} + +// item represents an item in the priority queue with its index +type item[V any] struct { + value V + index int } // NewPriorityQueue creates a new priority queue. -func NewPriorityQueue[T any](less func(T, T) bool) *PriorityQueue[T] { - h := &priorityHeap[T]{ +func NewPriorityQueue[K comparable, V any](less func(V, V) bool, key func(V) K) *PriorityQueue[K, V] { + h := &priorityHeap[V]{ less: less, - heap: make([]T, 0), + heap: make([]*item[V], 0), } heap.Init(h) - return &PriorityQueue[T]{h: h} + return &PriorityQueue[K, V]{ + h: h, + m: make(map[K]*item[V]), + key: key, + } } // Push adds an element to the queue. -func (pq *PriorityQueue[T]) Push(v T) { - heap.Push(pq.h, v) +func (pq *PriorityQueue[K, V]) Push(v V) { + k := pq.key(v) + if existing, ok := pq.m[k]; ok { + // Update existing item's value and fix heap + existing.value = v + heap.Fix(pq.h, existing.index) + return + } + + // Add new item + it := &item[V]{value: v} + pq.m[k] = it + heap.Push(pq.h, it) } // Pop removes and returns the element with the highest priority from the queue. -func (pq *PriorityQueue[T]) Pop() (T, bool) { +func (pq *PriorityQueue[K, V]) Pop() (V, bool) { if pq.Len() == 0 { - var zero T + var zero V return zero, false } - return heap.Pop(pq.h).(T), true + it := heap.Pop(pq.h).(*item[V]) + delete(pq.m, pq.key(it.value)) + return it.value, true +} + +// Lookup returns the item with the given key if it exists. +func (pq *PriorityQueue[K, V]) Lookup(k K) (V, bool) { + if it, ok := pq.m[k]; ok { + return it.value, true + } + var zero V + return zero, false +} + +// Remove removes and returns the item with the given key if it exists. +func (pq *PriorityQueue[K, V]) Remove(k K) (V, bool) { + it, ok := pq.m[k] + if !ok { + var zero V + return zero, false + } + heap.Remove(pq.h, it.index) + delete(pq.m, k) + return it.value, true +} + +// UpdatePriority updates the priority of an item and reorders the queue. +func (pq *PriorityQueue[K, V]) UpdatePriority(k K, v V) bool { + if it, ok := pq.m[k]; ok { + it.value = v + heap.Fix(pq.h, it.index) + return true + } + return false } // Len returns the number of elements in the queue. -func (pq *PriorityQueue[T]) Len() int { +func (pq *PriorityQueue[K, V]) Len() int { return pq.h.Len() } // priorityHeap is the internal heap implementation that satisfies heap.Interface. -type priorityHeap[T any] struct { - less func(T, T) bool - heap []T +type priorityHeap[V any] struct { + less func(V, V) bool + heap []*item[V] } -func (h *priorityHeap[T]) Len() int { +func (h *priorityHeap[V]) Len() int { return len(h.heap) } -func (h *priorityHeap[T]) Less(i, j int) bool { - return h.less(h.heap[i], h.heap[j]) +func (h *priorityHeap[V]) Less(i, j int) bool { + return h.less(h.heap[i].value, h.heap[j].value) } -func (h *priorityHeap[T]) Swap(i, j int) { +func (h *priorityHeap[V]) Swap(i, j int) { h.heap[i], h.heap[j] = h.heap[j], h.heap[i] + h.heap[i].index = i + h.heap[j].index = j } -func (h *priorityHeap[T]) Push(x any) { - h.heap = append(h.heap, x.(T)) +func (h *priorityHeap[V]) Push(x any) { + it := x.(*item[V]) + it.index = len(h.heap) + h.heap = append(h.heap, it) } -func (h *priorityHeap[T]) Pop() any { +func (h *priorityHeap[V]) Pop() any { old := h.heap n := len(old) - x := old[n-1] + it := old[n-1] h.heap = old[0 : n-1] - return x + return it } // CircularBuffer is a generic circular buffer. -type CircularBuffer[T any] struct { - buffer []T +type CircularBuffer[V any] struct { + buffer []V size int head int tail int } // NewCircularBuffer creates a new circular buffer with the given capacity. -func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] { - return &CircularBuffer[T]{ - buffer: make([]T, capacity), +func NewCircularBuffer[V any](capacity int) *CircularBuffer[V] { + return &CircularBuffer[V]{ + buffer: make([]V, capacity), size: 0, head: 0, tail: 0, @@ -87,8 +147,8 @@ func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] { } // Push adds an element to the circular buffer and returns the evicted element if any -func (b *CircularBuffer[T]) Push(v T) (T, bool) { - var evicted T +func (b *CircularBuffer[V]) Push(v V) (V, bool) { + var evicted V hasEvicted := false if b.size == len(b.buffer) { @@ -107,9 +167,9 @@ func (b *CircularBuffer[T]) Push(v T) (T, bool) { } // Pop removes and returns the oldest element from the buffer -func (b *CircularBuffer[T]) Pop() (T, bool) { +func (b *CircularBuffer[V]) Pop() (V, bool) { if b.size == 0 { - var zero T + var zero V return zero, false } @@ -121,6 +181,19 @@ func (b *CircularBuffer[T]) Pop() (T, bool) { } // Len returns the number of elements in the buffer -func (b *CircularBuffer[T]) Len() int { +func (b *CircularBuffer[V]) Len() int { return b.size } + +// returns the first element in the buffer that satisfies the given predicate +func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) { + for i := 0; i < b.size; i++ { + idx := (b.head + i) % len(b.buffer) + if f(b.buffer[idx]) { + return b.buffer[idx], true + } + + } + var zero V + return zero, false +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index dab46f164908d..1aeb15e8395e5 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -5,138 +5,233 @@ import ( "sync" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) const ( + DefaultPriority = 0 // TODO(owen-d): better determine priority when unknown defaultCompletedJobsCapacity = 100 ) -// JobWithPriority wraps a job with a priority value -type JobWithPriority[T comparable] struct { - Job *types.Job - Priority T +// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle +type JobWithMetadata struct { + *types.Job + Priority int + Status types.JobStatus + StartTime time.Time + UpdateTime time.Time } -// NewJobWithPriority creates a new JobWithPriority instance -func NewJobWithPriority[T comparable](job *types.Job, priority T) *JobWithPriority[T] { - return &JobWithPriority[T]{ - Job: job, - Priority: priority, +// NewJobWithMetadata creates a new JobWithMetadata instance +func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata { + return &JobWithMetadata{ + Job: job, + Priority: priority, + Status: types.JobStatusPending, + UpdateTime: time.Now(), } } -// inProgressJob contains a job and its start time -type inProgressJob struct { - job *types.Job - startTime time.Time -} - -// Duration returns how long the job has been running -func (j *inProgressJob) Duration() time.Duration { - return time.Since(j.startTime) -} - // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { - pending *PriorityQueue[*JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority - inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID - completed *CircularBuffer[*types.Job] // Last N completed jobs - statusMap map[string]types.JobStatus // Maps job ID to its current status + logger log.Logger + pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority + inProgress map[string]*JobWithMetadata // Jobs currently being processed + completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs + statusMap map[string]types.JobStatus // Maps job ID to its current status mu sync.RWMutex } -// NewJobQueue creates a new job queue instance -func NewJobQueue() *JobQueue { +func NewJobQueueWithLogger(logger log.Logger) *JobQueue { return &JobQueue{ - pending: NewPriorityQueue[*JobWithPriority[int]](func(a, b *JobWithPriority[int]) bool { - return a.Priority > b.Priority // Higher priority first - }), - inProgress: make(map[string]*inProgressJob), - completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity), + logger: logger, + pending: NewPriorityQueue( + func(a, b *JobWithMetadata) bool { + return a.Priority > b.Priority // Higher priority first + }, + func(j *JobWithMetadata) string { return j.ID() }, + ), + inProgress: make(map[string]*JobWithMetadata), + completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity), statusMap: make(map[string]types.JobStatus), } } +// NewJobQueue creates a new job queue instance +func NewJobQueue() *JobQueue { + return NewJobQueueWithLogger(log.NewNopLogger()) +} + +// Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() defer q.mu.RUnlock() - status, exists := q.statusMap[job.ID] - return status, exists + x, ok := q.existsLockLess(job.ID()) + if !ok { + return types.JobStatusUnknown, false + } + return x.Status, ok } -// Enqueue adds a new job to the pending queue with a priority +func (q *JobQueue) existsLockLess(id string) (*JobWithMetadata, bool) { + status, ok := q.statusMap[id] + if !ok { + return nil, false + } + + switch status { + case types.JobStatusPending: + return q.pending.Lookup(id) + case types.JobStatusInProgress: + res, ok := q.inProgress[id] + return res, ok + case types.JobStatusComplete: + return q.completed.Lookup(func(jwm *JobWithMetadata) bool { + return jwm.ID() == id + }) + default: + return nil, false + } +} + +// Enqueue adds a job to the pending queue with the given priority func (q *JobQueue) Enqueue(job *types.Job, priority int) error { q.mu.Lock() defer q.mu.Unlock() // Check if job already exists - if status, exists := q.statusMap[job.ID]; exists { - return fmt.Errorf("job %s already exists with status %v", job.ID, status) + if status, exists := q.statusMap[job.ID()]; exists { + return fmt.Errorf("job %s already exists with status %v", job.ID(), status) } - jobWithPriority := NewJobWithPriority(job, priority) - q.pending.Push(jobWithPriority) - q.statusMap[job.ID] = types.JobStatusPending + jobMeta := NewJobWithMetadata(job, priority) + q.pending.Push(jobMeta) + q.statusMap[job.ID()] = types.JobStatusPending return nil } -// Dequeue gets the next available job and assigns it to a builder -func (q *JobQueue) Dequeue(_ string) (*types.Job, bool, error) { +// Dequeue removes and returns the highest priority job from the pending queue +func (q *JobQueue) Dequeue() (*types.Job, bool) { q.mu.Lock() defer q.mu.Unlock() - if q.pending.Len() == 0 { - return nil, false, nil - } - - jobWithPriority, ok := q.pending.Pop() + jobMeta, ok := q.pending.Pop() if !ok { - return nil, false, nil + return nil, false } - // Add to in-progress with current time - q.inProgress[jobWithPriority.Job.ID] = &inProgressJob{ - job: jobWithPriority.Job, - startTime: time.Now(), + // Update metadata for in-progress state + jobMeta.Status = types.JobStatusInProgress + jobMeta.StartTime = time.Now() + jobMeta.UpdateTime = jobMeta.StartTime + + q.inProgress[jobMeta.ID()] = jobMeta + q.statusMap[jobMeta.ID()] = types.JobStatusInProgress + + return jobMeta.Job, true +} + +// GetInProgressJob retrieves a job that is currently being processed +func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + if jobMeta, ok := q.inProgress[id]; ok { + return jobMeta.Job, jobMeta.StartTime, true } - q.statusMap[jobWithPriority.Job.ID] = types.JobStatusInProgress + return nil, time.Time{}, false +} - return jobWithPriority.Job, true, nil +// RemoveInProgress removes a job from the in-progress map +func (q *JobQueue) RemoveInProgress(id string) { + q.mu.Lock() + defer q.mu.Unlock() + + delete(q.inProgress, id) } -// MarkComplete moves a job from in-progress to completed -func (q *JobQueue) MarkComplete(jobID string) { +// MarkComplete moves a job from in-progress to completed with the given status +func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { q.mu.Lock() defer q.mu.Unlock() - // Find job in in-progress map - inProgressJob, exists := q.inProgress[jobID] - // if it doesn't exist, it could be previously removed (duplicate job execution) - // or the scheduler may have restarted and not have the job state anymore. - if exists { - // Remove from in-progress - delete(q.inProgress, jobID) + jobMeta, ok := q.existsLockLess(id) + if !ok { + level.Error(q.logger).Log("msg", "failed to mark job as complete", "job", id, "status", status) + return } - // Add to completed buffer and handle evicted job - if evictedJob, hasEvicted := q.completed.Push(inProgressJob.job); hasEvicted { - // Remove evicted job from status map - delete(q.statusMap, evictedJob.ID) + switch jobMeta.Status { + case types.JobStatusInProgress: + // update & remove from in progress + delete(q.inProgress, id) + case types.JobStatusPending: + _, ok := q.pending.Remove(id) + if !ok { + level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id) + } + default: + level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status) } - q.statusMap[jobID] = types.JobStatusComplete + + jobMeta.Status = status + jobMeta.UpdateTime = time.Now() + + // add it to the completed buffer, removing any evicted job from the statusMap + removal, evicted := q.completed.Push(jobMeta) + if evicted { + delete(q.statusMap, removal.ID()) + } + q.statusMap[id] = status } -// SyncJob registers a job as in-progress, used for restoring state after scheduler restarts -func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) { +// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress +func (q *JobQueue) SyncJob(jobID string, job *types.Job) { q.mu.Lock() defer q.mu.Unlock() - // Add directly to in-progress - q.inProgress[jobID] = &inProgressJob{ - job: job, - startTime: time.Now(), + // Helper function to create a new job + registerInProgress := func() { + // Job does not exist; add it as in-progress + now := time.Now() + jobMeta := NewJobWithMetadata(job, DefaultPriority) + jobMeta.StartTime = now + jobMeta.UpdateTime = now + jobMeta.Status = types.JobStatusInProgress + q.inProgress[jobID] = jobMeta + q.statusMap[jobID] = types.JobStatusInProgress + } + + jobMeta, ok := q.existsLockLess(jobID) + + if !ok { + registerInProgress() + return } + + switch jobMeta.Status { + case types.JobStatusPending: + // Job already pending, move to in-progress + _, ok := q.pending.Remove(jobID) + if !ok { + level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID) + } + jobMeta.Status = types.JobStatusInProgress + case types.JobStatusInProgress: + case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: + // Job already completed, re-enqueue a new one + registerInProgress() + return + default: + registerInProgress() + return + } + + jobMeta.UpdateTime = time.Now() + q.inProgress[jobID] = jobMeta q.statusMap[jobID] = types.JobStatusInProgress } diff --git a/pkg/blockbuilder/scheduler/queue_test.go b/pkg/blockbuilder/scheduler/queue_test.go new file mode 100644 index 0000000000000..dfbe07681c62a --- /dev/null +++ b/pkg/blockbuilder/scheduler/queue_test.go @@ -0,0 +1,163 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +func TestJobQueue_SyncJob(t *testing.T) { + t.Run("non-existent to in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + jobID := job.ID() + + beforeSync := time.Now() + q.SyncJob(jobID, job) + afterSync := time.Now() + + // Verify job is in in-progress map + jobMeta, ok := q.inProgress[jobID] + require.True(t, ok, "job should be in in-progress map") + require.Equal(t, types.JobStatusInProgress, jobMeta.Status) + require.True(t, jobMeta.StartTime.After(beforeSync) || jobMeta.StartTime.Equal(beforeSync)) + require.True(t, jobMeta.StartTime.Before(afterSync) || jobMeta.StartTime.Equal(afterSync)) + }) + + t.Run("pending to in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with pending job + err := q.Enqueue(job, DefaultPriority) + require.NoError(t, err) + + beforeSync := time.Now() + q.SyncJob(job.ID(), job) + afterSync := time.Now() + + // Verify job moved from pending to in-progress + _, ok := q.pending.Lookup(job.ID()) + require.False(t, ok, "job should not be in pending queue") + + jobMeta, ok := q.inProgress[job.ID()] + require.True(t, ok, "job should be in in-progress map") + require.Equal(t, types.JobStatusInProgress, jobMeta.Status) + require.True(t, jobMeta.UpdateTime.After(beforeSync) || jobMeta.UpdateTime.Equal(beforeSync)) + require.True(t, jobMeta.UpdateTime.Before(afterSync) || jobMeta.UpdateTime.Equal(afterSync)) + }) + + t.Run("already in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // First sync to put in in-progress + q.SyncJob(job.ID(), job) + firstUpdate := q.inProgress[job.ID()].UpdateTime + + time.Sleep(time.Millisecond) // Ensure time difference + beforeSecondSync := time.Now() + q.SyncJob(job.ID(), job) + afterSecondSync := time.Now() + + jobMeta := q.inProgress[job.ID()] + require.True(t, jobMeta.UpdateTime.After(firstUpdate), "UpdateTime should be updated") + require.True(t, jobMeta.UpdateTime.After(beforeSecondSync) || jobMeta.UpdateTime.Equal(beforeSecondSync)) + require.True(t, jobMeta.UpdateTime.Before(afterSecondSync) || jobMeta.UpdateTime.Equal(afterSecondSync)) + }) +} + +func TestJobQueue_MarkComplete(t *testing.T) { + t.Run("in-progress to complete", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with in-progress job + q.SyncJob(job.ID(), job) + + beforeComplete := time.Now() + q.MarkComplete(job.ID(), types.JobStatusComplete) + afterComplete := time.Now() + + // Verify job moved to completed buffer + var foundJob *JobWithMetadata + q.completed.Lookup(func(j *JobWithMetadata) bool { + if j.ID() == job.ID() { + foundJob = j + return true + } + return false + }) + require.NotNil(t, foundJob, "job should be in completed buffer") + require.Equal(t, types.JobStatusComplete, foundJob.Status) + require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete)) + require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete)) + + // Verify removed from in-progress + _, ok := q.inProgress[job.ID()] + require.False(t, ok, "job should not be in in-progress map") + }) + + t.Run("pending to complete", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with pending job + err := q.Enqueue(job, DefaultPriority) + require.NoError(t, err) + + q.MarkComplete(job.ID(), types.JobStatusComplete) + + // Verify job not in pending + _, ok := q.pending.Lookup(job.ID()) + require.False(t, ok, "job should not be in pending queue") + + // Verify job in completed buffer + var foundJob *JobWithMetadata + q.completed.Lookup(func(j *JobWithMetadata) bool { + if j.ID() == job.ID() { + foundJob = j + return true + } + return false + }) + require.NotNil(t, foundJob, "job should be in completed buffer") + require.Equal(t, types.JobStatusComplete, foundJob.Status) + }) + + t.Run("non-existent job", func(t *testing.T) { + q := NewJobQueue() + logger := &testLogger{t: t} + q.logger = logger + + q.MarkComplete("non-existent", types.JobStatusComplete) + // Should log error but not panic + }) + + t.Run("already completed job", func(t *testing.T) { + q := NewJobQueue() + logger := &testLogger{t: t} + q.logger = logger + + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + q.SyncJob(job.ID(), job) + q.MarkComplete(job.ID(), types.JobStatusComplete) + + // Try to complete again + q.MarkComplete(job.ID(), types.JobStatusComplete) + // Should log error but not panic + }) +} + +// testLogger implements log.Logger for testing +type testLogger struct { + t *testing.T +} + +func (l *testLogger) Log(keyvals ...interface{}) error { + l.t.Log(keyvals...) + return nil +} diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 4eb1eaedde9ec..5e55e3123420d 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -144,14 +144,53 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { for _, job := range jobs { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID - if status, ok := s.queue.Exists(job.Job); ok { - level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status) + + logger := log.With( + s.logger, + "job", job.Job.ID(), + "priority", job.Priority, + ) + + status, ok := s.queue.Exists(job.Job) + + // scheduler is unaware of incoming job; enqueue + if !ok { + level.Debug(logger).Log( + "msg", "job does not exist, enqueueing", + ) + + // enqueue + if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { + level.Error(logger).Log("msg", "failed to enqueue job", "err", err) + } + continue } - if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { - level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err) + // scheduler is aware of incoming job; handling depends on status + switch status { + case types.JobStatusPending: + level.Debug(s.logger).Log( + "msg", "job is pending, updating priority", + "old_priority", job.Priority, + ) + s.queue.pending.UpdatePriority(job.Job.ID(), job) + case types.JobStatusInProgress: + level.Debug(s.logger).Log( + "msg", "job is in progress, ignoring", + ) + case types.JobStatusComplete: + // shouldn't happen + level.Debug(s.logger).Log( + "msg", "job is complete, ignoring", + ) + default: + level.Error(s.logger).Log( + "msg", "job has unknown status, ignoring", + "status", status, + ) } + } return nil @@ -165,22 +204,45 @@ func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) { } } -func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { +func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() default: - return s.queue.Dequeue(builderID) + job, ok := s.queue.Dequeue() + return job, ok, nil } } -func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error { - // TODO: handle commits - s.queue.MarkComplete(job.ID) +func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, success bool) (err error) { + logger := log.With(s.logger, "job", job.ID()) + + if success { + if err = s.offsetManager.Commit( + ctx, + job.Partition(), + job.Offsets().Max-1, // max is exclusive, so commit max-1 + ); err == nil { + s.queue.MarkComplete(job.ID(), types.JobStatusComplete) + level.Info(logger).Log("msg", "job completed successfully") + return nil + } + + level.Error(logger).Log("msg", "failed to commit offset", "err", err) + } + + level.Error(logger).Log("msg", "job failed, re-enqueuing") + s.queue.MarkComplete(job.ID(), types.JobStatusFailed) + s.queue.pending.Push( + NewJobWithMetadata( + job, + DefaultPriority, + ), + ) return nil } -func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { - s.queue.SyncJob(job.ID, builderID, job) +func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error { + s.queue.SyncJob(job.ID(), job) return nil } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index bc72d985f39b6..f13c6d49485c1 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -7,8 +7,10 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" + "github.com/grafana/loki/v3/pkg/kafka/partition" ) type testEnv struct { @@ -18,9 +20,33 @@ type testEnv struct { builder *Worker } +type mockOffsetManager struct { + topic string + consumerGroup string +} + +func (m *mockOffsetManager) Topic() string { return m.topic } +func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup } +func (m *mockOffsetManager) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) { + return nil, nil +} +func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) { + return 0, nil +} +func (m *mockOffsetManager) FetchPartitionOffset(_ context.Context, _ int32, _ partition.SpecialOffset) (int64, error) { + return 0, nil +} +func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { + return nil +} + func newTestEnv(builderID string) *testEnv { queue := NewJobQueue() - scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry()) + mockOffsetMgr := &mockOffsetManager{ + topic: "test-topic", + consumerGroup: "test-group", + } + scheduler := NewScheduler(Config{}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry()) transport := types.NewMemoryTransport(scheduler) builder := NewWorker(builderID, transport) @@ -51,8 +77,8 @@ func TestScheduleAndProcessJob(t *testing.T) { if !ok { t.Fatal("expected to receive job") } - if receivedJob.ID != job.ID { - t.Errorf("got job ID %s, want %s", receivedJob.ID, job.ID) + if receivedJob.ID() != job.ID() { + t.Errorf("got job ID %s, want %s", receivedJob.ID(), job.ID()) } // Builder completes job @@ -124,7 +150,7 @@ func TestMultipleBuilders(t *testing.T) { } // Verify different jobs were assigned - if receivedJob1.ID == receivedJob2.ID { + if receivedJob1.ID() == receivedJob2.ID() { t.Error("builders received same job") } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 75719140a4ea0..d4d14bff0e44a 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -18,7 +18,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context) ([]*JobWithPriority[int], error) + Plan(ctx context.Context) ([]*JobWithMetadata, error) } const ( @@ -46,14 +46,14 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) { +func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) { offsets, err := p.offsetReader.GroupLag(ctx) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) return nil, err } - jobs := make([]*JobWithPriority[int], 0, len(offsets)) + jobs := make([]*JobWithMetadata, 0, len(offsets)) for _, partitionOffset := range offsets { // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. // no additional validation is needed here @@ -69,11 +69,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], for currentStart := startOffset; currentStart < endOffset; { currentEnd := min(currentStart+p.targetRecordCount, endOffset) - job := NewJobWithPriority( + job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, Max: currentEnd, - }), int(endOffset-currentStart), // priority is remaining records to process + }), + int(endOffset-currentStart), // priority is remaining records to process ) jobs = append(jobs, job) @@ -83,8 +84,8 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], // Sort jobs by partition then priority sort.Slice(jobs, func(i, j int) bool { - if jobs[i].Job.Partition != jobs[j].Job.Partition { - return jobs[i].Job.Partition < jobs[j].Job.Partition + if jobs[i].Job.Partition() != jobs[j].Job.Partition() { + return jobs[i].Job.Partition() < jobs[j].Job.Partition() } return jobs[i].Priority > jobs[j].Priority }) diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 9c7b732fb4e08..6771aaf9868c5 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -19,11 +19,19 @@ func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemb return m.groupLag, nil } +// compareJobs compares two JobWithMetadata instances ignoring UpdateTime +func compareJobs(t *testing.T, expected, actual *JobWithMetadata) { + require.Equal(t, expected.Job, actual.Job) + require.Equal(t, expected.Priority, actual.Priority) + require.Equal(t, expected.Status, actual.Status) + require.Equal(t, expected.StartTime, actual.StartTime) +} + func TestRecordCountPlanner_Plan(t *testing.T) { for _, tc := range []struct { name string recordCount int64 - expectedJobs []*JobWithPriority[int] + expectedJobs []*JobWithMetadata groupLag map[int32]kadm.GroupMemberLag }{ { @@ -40,8 +48,8 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 0, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 101, Max: 150}), 49, // 150-101 ), @@ -61,12 +69,12 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 0, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 101, Max: 151}), 99, // priority is total remaining: 200-101 ), - NewJobWithPriority( + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 151, Max: 200}), 49, // priority is total remaining: 200-151 ), @@ -95,19 +103,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 1, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( + types.NewJob(0, types.Offsets{Min: 101, Max: 150}), + 49, // priority is total remaining: 150-101 + ), + NewJobWithMetadata( types.NewJob(1, types.Offsets{Min: 201, Max: 301}), 199, // priority is total remaining: 400-201 ), - NewJobWithPriority( + NewJobWithMetadata( types.NewJob(1, types.Offsets{Min: 301, Max: 400}), 99, // priority is total remaining: 400-301 ), - NewJobWithPriority( - types.NewJob(0, types.Offsets{Min: 101, Max: 150}), - 49, // priority is total remaining: 150-101 - ), }, }, { @@ -145,7 +153,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs)) - require.ElementsMatch(t, tc.expectedJobs, jobs) + for i := range tc.expectedJobs { + compareJobs(t, tc.expectedJobs[i], jobs[i]) + } }) } } diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go index 3b90ba9f20f06..4d52bdfc7745e 100644 --- a/pkg/blockbuilder/types/grpc_transport.go +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -110,9 +110,9 @@ func protoToJob(p *proto.Job) *Job { return nil } return &Job{ - ID: p.GetId(), - Partition: p.GetPartition(), - Offsets: Offsets{ + id: p.GetId(), + partition: p.GetPartition(), + offsets: Offsets{ Min: p.GetOffsets().GetMin(), Max: p.GetOffsets().GetMax(), }, @@ -125,11 +125,11 @@ func jobToProto(j *Job) *proto.Job { return nil } return &proto.Job{ - Id: j.ID, - Partition: j.Partition, + Id: j.ID(), + Partition: j.Partition(), Offsets: &proto.Offsets{ - Min: j.Offsets.Min, - Max: j.Offsets.Max, + Min: j.offsets.Min, + Max: j.offsets.Max, }, } } diff --git a/pkg/blockbuilder/types/interfaces.go b/pkg/blockbuilder/types/interfaces.go index 2144e83878cb5..5ed51b39caf2f 100644 --- a/pkg/blockbuilder/types/interfaces.go +++ b/pkg/blockbuilder/types/interfaces.go @@ -15,11 +15,11 @@ type BuilderTransport interface { // SchedulerHandler defines the business logic for handling builder requests type SchedulerHandler interface { // HandleGetJob processes a request for a new job - HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) + HandleGetJob(ctx context.Context) (*Job, bool, error) // HandleCompleteJob processes a job completion notification - HandleCompleteJob(ctx context.Context, builderID string, job *Job, success bool) error + HandleCompleteJob(ctx context.Context, job *Job, success bool) error // HandleSyncJob processes a job sync request - HandleSyncJob(ctx context.Context, builderID string, job *Job) error + HandleSyncJob(ctx context.Context, job *Job) error } // Request/Response message types diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index 9cf94daebd484..ca23aa003b96e 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -4,21 +4,53 @@ import "fmt" // Job represents a block building task. type Job struct { - ID string + id string // Partition and offset information - Partition int32 - Offsets Offsets + partition int32 + offsets Offsets +} + +func (j *Job) ID() string { + return j.id +} + +func (j *Job) Partition() int32 { + return j.partition +} + +func (j *Job) Offsets() Offsets { + return j.offsets } // JobStatus represents the current state of a job type JobStatus int const ( - JobStatusPending JobStatus = iota + JobStatusUnknown JobStatus = iota // zero value, largely unused + JobStatusPending JobStatusInProgress JobStatusComplete + JobStatusFailed // Job failed and may be retried + JobStatusExpired // Job failed too many times or is too old ) +func (s JobStatus) String() string { + switch s { + case JobStatusPending: + return "pending" + case JobStatusInProgress: + return "in_progress" + case JobStatusComplete: + return "complete" + case JobStatusFailed: + return "failed" + case JobStatusExpired: + return "expired" + default: + return "unknown" + } +} + // Offsets represents the range of offsets to process type Offsets struct { Min int64 @@ -28,9 +60,9 @@ type Offsets struct { // NewJob creates a new job with the given partition and offsets func NewJob(partition int32, offsets Offsets) *Job { return &Job{ - ID: GenerateJobID(partition, offsets), - Partition: partition, - Offsets: offsets, + id: GenerateJobID(partition, offsets), + partition: partition, + offsets: offsets, } } diff --git a/pkg/blockbuilder/types/scheduler_server.go b/pkg/blockbuilder/types/scheduler_server.go index c2756903859f2..a5deaa276d622 100644 --- a/pkg/blockbuilder/types/scheduler_server.go +++ b/pkg/blockbuilder/types/scheduler_server.go @@ -20,8 +20,8 @@ func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer { } // GetJob implements proto.SchedulerServiceServer -func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { - job, ok, err := s.handler.HandleGetJob(ctx, req.BuilderId) +func (s *schedulerServer) GetJob(ctx context.Context, _ *proto.GetJobRequest) (*proto.GetJobResponse, error) { + job, ok, err := s.handler.HandleGetJob(ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -39,7 +39,7 @@ func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) // CompleteJob implements proto.SchedulerServiceServer func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) { - if err := s.handler.HandleCompleteJob(ctx, req.BuilderId, protoToJob(req.Job), req.Success); err != nil { + if err := s.handler.HandleCompleteJob(ctx, protoToJob(req.Job), req.Success); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &proto.CompleteJobResponse{}, nil @@ -47,7 +47,7 @@ func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJo // SyncJob implements proto.SchedulerServiceServer func (s *schedulerServer) SyncJob(ctx context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) { - if err := s.handler.HandleSyncJob(ctx, req.BuilderId, protoToJob(req.Job)); err != nil { + if err := s.handler.HandleSyncJob(ctx, protoToJob(req.Job)); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &proto.SyncJobResponse{}, nil diff --git a/pkg/blockbuilder/types/transport.go b/pkg/blockbuilder/types/transport.go index 6f7dc41e394fe..ac8917b854c36 100644 --- a/pkg/blockbuilder/types/transport.go +++ b/pkg/blockbuilder/types/transport.go @@ -36,8 +36,8 @@ func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport { } } -func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { - job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID) +func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, _ *GetJobRequest) (*GetJobResponse, error) { + job, ok, err := t.scheduler.HandleGetJob(ctx) if err != nil { return nil, err } @@ -48,9 +48,9 @@ func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequ } func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error { - return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job, req.Success) + return t.scheduler.HandleCompleteJob(ctx, req.Job, req.Success) } func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error { - return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job) + return t.scheduler.HandleSyncJob(ctx, req.Job) } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 395a891e62103..e30136c2b8637 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1866,7 +1866,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { s := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, - blockscheduler.NewJobQueue(), + blockscheduler.NewJobQueueWithLogger(logger), offsetManager, logger, prometheus.DefaultRegisterer,