From fe412860cdac0992b6388a8bd0ccb3f3670b1e36 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 11:58:43 +0800 Subject: [PATCH 01/16] key lookups for priority queue --- .../scheduler/prioritiy_queue_test.go | 81 ++++++++++++--- pkg/blockbuilder/scheduler/priority_queue.go | 99 ++++++++++++++++--- pkg/blockbuilder/scheduler/queue.go | 19 ++-- pkg/blockbuilder/scheduler/scheduler.go | 10 +- 4 files changed, 173 insertions(+), 36 deletions(-) diff --git a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go index b27d950aa04b0..02665518e3b14 100644 --- a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go +++ b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go @@ -7,7 +7,7 @@ import ( ) func TestPriorityQueue(t *testing.T) { - t.Run("operations", func(t *testing.T) { + t.Run("basic operations", func(t *testing.T) { tests := []struct { name string input []int @@ -33,16 +33,14 @@ func TestPriorityQueue(t *testing.T) { input: []int{3, 1, 2}, wantPops: []int{1, 2, 3}, }, - { - name: "duplicate elements", - input: []int{2, 1, 2, 1}, - wantPops: []int{1, 1, 2, 2}, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pq := NewPriorityQueue[int](func(a, b int) bool { return a < b }) + pq := NewPriorityQueue[int, int]( + func(a, b int) bool { return a < b }, + func(a int) int { return a }, + ) require.Equal(t, 0, pq.Len()) // Push all elements @@ -69,15 +67,73 @@ func TestPriorityQueue(t *testing.T) { } }) + t.Run("key operations", func(t *testing.T) { + type Job struct { + ID string + Priority int + } + + pq := NewPriorityQueue[Job, string]( + func(a, b Job) bool { return a.Priority < b.Priority }, + func(j Job) string { return j.ID }, + ) + + // Test Push with duplicate key + job1 := Job{ID: "job1", Priority: 1} + job1Updated := Job{ID: "job1", Priority: 3} + job2 := Job{ID: "job2", Priority: 2} + + pq.Push(job1) + require.Equal(t, 1, pq.Len()) + + // Push with same key should update + pq.Push(job1Updated) + require.Equal(t, 1, pq.Len()) + + // Verify updated priority + v, ok := pq.Lookup("job1") + require.True(t, ok) + require.Equal(t, job1Updated, v) + + // Test Remove + pq.Push(job2) + v, ok = pq.Remove("job1") + require.True(t, ok) + require.Equal(t, job1Updated, v) + require.Equal(t, 1, pq.Len()) + + // Test UpdatePriority + newJob2 := Job{ID: "job2", Priority: 4} + ok = pq.UpdatePriority("job2", newJob2) + require.True(t, ok) + + v, ok = pq.Lookup("job2") + require.True(t, ok) + require.Equal(t, newJob2, v) + + // Test non-existent key operations + v, ok = pq.Lookup("nonexistent") + require.False(t, ok) + require.Zero(t, v) + + v, ok = pq.Remove("nonexistent") + require.False(t, ok) + require.Zero(t, v) + + ok = pq.UpdatePriority("nonexistent", Job{}) + require.False(t, ok) + }) + t.Run("custom type", func(t *testing.T) { type Job struct { ID string Priority int } - pq := NewPriorityQueue[Job](func(a, b Job) bool { - return a.Priority < b.Priority - }) + pq := NewPriorityQueue[Job, string]( + func(a, b Job) bool { return a.Priority < b.Priority }, + func(j Job) string { return j.ID }, + ) jobs := []Job{ {ID: "high", Priority: 3}, @@ -102,7 +158,10 @@ func TestPriorityQueue(t *testing.T) { }) t.Run("mixed operations", func(t *testing.T) { - pq := NewPriorityQueue[int](func(a, b int) bool { return a < b }) + pq := NewPriorityQueue[int, int]( + func(a, b int) bool { return a < b }, + func(a int) int { return a }, + ) // Push some elements pq.Push(3) diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 3b488716cabe8..6a2a3a211fc04 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -4,44 +4,103 @@ import ( "container/heap" ) -// PriorityQueue is a generic priority queue. -type PriorityQueue[T any] struct { - h *priorityHeap[T] +// PriorityQueue is a generic priority queue with constant time lookups. +type PriorityQueue[T any, K comparable] struct { + h *priorityHeap[T] + m map[K]*item[T] // Map for constant time lookups + key func(T) K // Function to extract key from item +} + +// item represents an item in the priority queue with its index +type item[T any] struct { + value T + index int } // NewPriorityQueue creates a new priority queue. -func NewPriorityQueue[T any](less func(T, T) bool) *PriorityQueue[T] { +func NewPriorityQueue[T any, K comparable](less func(T, T) bool, key func(T) K) *PriorityQueue[T, K] { h := &priorityHeap[T]{ less: less, - heap: make([]T, 0), + heap: make([]*item[T], 0), + idx: make(map[int]*item[T]), } heap.Init(h) - return &PriorityQueue[T]{h: h} + return &PriorityQueue[T, K]{ + h: h, + m: make(map[K]*item[T]), + key: key, + } } // Push adds an element to the queue. -func (pq *PriorityQueue[T]) Push(v T) { - heap.Push(pq.h, v) +func (pq *PriorityQueue[T, K]) Push(v T) { + k := pq.key(v) + if existing, ok := pq.m[k]; ok { + // Update existing item's value and fix heap + existing.value = v + heap.Fix(pq.h, existing.index) + return + } + + // Add new item + idx := pq.h.Len() + it := &item[T]{value: v, index: idx} + pq.m[k] = it + heap.Push(pq.h, it) } // Pop removes and returns the element with the highest priority from the queue. -func (pq *PriorityQueue[T]) Pop() (T, bool) { +func (pq *PriorityQueue[T, K]) Pop() (T, bool) { if pq.Len() == 0 { var zero T return zero, false } - return heap.Pop(pq.h).(T), true + it := heap.Pop(pq.h).(*item[T]) + delete(pq.m, pq.key(it.value)) + return it.value, true +} + +// Lookup returns the item with the given key if it exists. +func (pq *PriorityQueue[T, K]) Lookup(k K) (T, bool) { + if it, ok := pq.m[k]; ok { + return it.value, true + } + var zero T + return zero, false +} + +// Remove removes and returns the item with the given key if it exists. +func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) { + it, ok := pq.m[k] + if !ok { + var zero T + return zero, false + } + heap.Remove(pq.h, it.index) + delete(pq.m, k) + return it.value, true +} + +// UpdatePriority updates the priority of an item and reorders the queue. +func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool { + if it, ok := pq.m[k]; ok { + it.value = v + heap.Fix(pq.h, it.index) + return true + } + return false } // Len returns the number of elements in the queue. -func (pq *PriorityQueue[T]) Len() int { +func (pq *PriorityQueue[T, K]) Len() int { return pq.h.Len() } // priorityHeap is the internal heap implementation that satisfies heap.Interface. type priorityHeap[T any] struct { less func(T, T) bool - heap []T + heap []*item[T] + idx map[int]*item[T] // Maps index to item for efficient updates } func (h *priorityHeap[T]) Len() int { @@ -49,23 +108,31 @@ func (h *priorityHeap[T]) Len() int { } func (h *priorityHeap[T]) Less(i, j int) bool { - return h.less(h.heap[i], h.heap[j]) + return h.less(h.heap[i].value, h.heap[j].value) } func (h *priorityHeap[T]) Swap(i, j int) { h.heap[i], h.heap[j] = h.heap[j], h.heap[i] + h.heap[i].index = i + h.heap[j].index = j + h.idx[i] = h.heap[i] + h.idx[j] = h.heap[j] } func (h *priorityHeap[T]) Push(x any) { - h.heap = append(h.heap, x.(T)) + it := x.(*item[T]) + it.index = len(h.heap) + h.heap = append(h.heap, it) + h.idx[it.index] = it } func (h *priorityHeap[T]) Pop() any { old := h.heap n := len(old) - x := old[n-1] + it := old[n-1] h.heap = old[0 : n-1] - return x + delete(h.idx, it.index) + return it } // CircularBuffer is a generic circular buffer. diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index dab46f164908d..16fe2499bef28 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -39,19 +39,24 @@ func (j *inProgressJob) Duration() time.Duration { // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { - pending *PriorityQueue[*JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority - inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID - completed *CircularBuffer[*types.Job] // Last N completed jobs - statusMap map[string]types.JobStatus // Maps job ID to its current status + pending *PriorityQueue[*JobWithPriority[int], string] // Jobs waiting to be processed, ordered by priority + inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID + completed *CircularBuffer[*types.Job] // Last N completed jobs + statusMap map[string]types.JobStatus // Maps job ID to its current status mu sync.RWMutex } // NewJobQueue creates a new job queue instance func NewJobQueue() *JobQueue { return &JobQueue{ - pending: NewPriorityQueue[*JobWithPriority[int]](func(a, b *JobWithPriority[int]) bool { - return a.Priority > b.Priority // Higher priority first - }), + pending: NewPriorityQueue[*JobWithPriority[int]]( + func(a, b *JobWithPriority[int]) bool { + return a.Priority > b.Priority // Higher priority first + }, + func(a *JobWithPriority[int]) string { + return a.Job.ID + }, + ), inProgress: make(map[string]*inProgressJob), completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity), statusMap: make(map[string]types.JobStatus), diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 4eb1eaedde9ec..3054331b57115 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -146,6 +146,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID if status, ok := s.queue.Exists(job.Job); ok { level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status) + // TODO: update priority continue } @@ -174,8 +175,13 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t } } -func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, _ bool) error { - // TODO: handle commits +func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error { + logger := log.With(s.logger, "job", job.ID) + + if !success { + level.Error(logger).Log("msg", "job failed") + return nil + } s.queue.MarkComplete(job.ID) return nil } From f493c535518152d00dda67bfbf95e9eeb5ff45bf Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 13:05:03 +0800 Subject: [PATCH 02/16] priorityqueue uses expected k/v annotations, works on scheduler handler --- .../scheduler/prioritiy_queue_test.go | 20 ++--- pkg/blockbuilder/scheduler/priority_queue.go | 86 +++++++++---------- pkg/blockbuilder/scheduler/queue.go | 4 +- pkg/blockbuilder/scheduler/scheduler.go | 56 ++++++++++-- pkg/blockbuilder/types/job.go | 4 + 5 files changed, 109 insertions(+), 61 deletions(-) diff --git a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go index 02665518e3b14..05b8ee07679b8 100644 --- a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go +++ b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go @@ -39,7 +39,7 @@ func TestPriorityQueue(t *testing.T) { t.Run(tt.name, func(t *testing.T) { pq := NewPriorityQueue[int, int]( func(a, b int) bool { return a < b }, - func(a int) int { return a }, + func(v int) int { return v }, ) require.Equal(t, 0, pq.Len()) @@ -73,7 +73,7 @@ func TestPriorityQueue(t *testing.T) { Priority int } - pq := NewPriorityQueue[Job, string]( + pq := NewPriorityQueue[string, Job]( func(a, b Job) bool { return a.Priority < b.Priority }, func(j Job) string { return j.ID }, ) @@ -130,7 +130,7 @@ func TestPriorityQueue(t *testing.T) { Priority int } - pq := NewPriorityQueue[Job, string]( + pq := NewPriorityQueue[string, Job]( func(a, b Job) bool { return a.Priority < b.Priority }, func(j Job) string { return j.ID }, ) @@ -160,26 +160,26 @@ func TestPriorityQueue(t *testing.T) { t.Run("mixed operations", func(t *testing.T) { pq := NewPriorityQueue[int, int]( func(a, b int) bool { return a < b }, - func(a int) int { return a }, + func(v int) int { return v }, ) // Push some elements pq.Push(3) pq.Push(1) - require.Equal(t, 2, pq.Len()) + pq.Push(4) - // Pop lowest + // Pop an element v, ok := pq.Pop() require.True(t, ok) require.Equal(t, 1, v) // Push more elements pq.Push(2) - pq.Push(4) + pq.Push(5) - // Verify remaining elements come out in order - want := []int{2, 3, 4} - got := make([]int, 0, 3) + // Pop remaining elements and verify order + want := []int{2, 3, 4, 5} + got := make([]int, 0, len(want)) for range want { v, ok := pq.Pop() require.True(t, ok) diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 6a2a3a211fc04..248f8ff2d98b6 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -5,35 +5,35 @@ import ( ) // PriorityQueue is a generic priority queue with constant time lookups. -type PriorityQueue[T any, K comparable] struct { - h *priorityHeap[T] - m map[K]*item[T] // Map for constant time lookups - key func(T) K // Function to extract key from item +type PriorityQueue[K comparable, V any] struct { + h *priorityHeap[V] + m map[K]*item[V] // Map for constant time lookups + key func(V) K // Function to extract key from value } // item represents an item in the priority queue with its index -type item[T any] struct { - value T +type item[V any] struct { + value V index int } // NewPriorityQueue creates a new priority queue. -func NewPriorityQueue[T any, K comparable](less func(T, T) bool, key func(T) K) *PriorityQueue[T, K] { - h := &priorityHeap[T]{ +func NewPriorityQueue[K comparable, V any](less func(V, V) bool, key func(V) K) *PriorityQueue[K, V] { + h := &priorityHeap[V]{ less: less, - heap: make([]*item[T], 0), - idx: make(map[int]*item[T]), + heap: make([]*item[V], 0), + idx: make(map[int]*item[V]), } heap.Init(h) - return &PriorityQueue[T, K]{ + return &PriorityQueue[K, V]{ h: h, - m: make(map[K]*item[T]), + m: make(map[K]*item[V]), key: key, } } // Push adds an element to the queue. -func (pq *PriorityQueue[T, K]) Push(v T) { +func (pq *PriorityQueue[K, V]) Push(v V) { k := pq.key(v) if existing, ok := pq.m[k]; ok { // Update existing item's value and fix heap @@ -44,36 +44,36 @@ func (pq *PriorityQueue[T, K]) Push(v T) { // Add new item idx := pq.h.Len() - it := &item[T]{value: v, index: idx} + it := &item[V]{value: v, index: idx} pq.m[k] = it heap.Push(pq.h, it) } // Pop removes and returns the element with the highest priority from the queue. -func (pq *PriorityQueue[T, K]) Pop() (T, bool) { +func (pq *PriorityQueue[K, V]) Pop() (V, bool) { if pq.Len() == 0 { - var zero T + var zero V return zero, false } - it := heap.Pop(pq.h).(*item[T]) + it := heap.Pop(pq.h).(*item[V]) delete(pq.m, pq.key(it.value)) return it.value, true } // Lookup returns the item with the given key if it exists. -func (pq *PriorityQueue[T, K]) Lookup(k K) (T, bool) { +func (pq *PriorityQueue[K, V]) Lookup(k K) (V, bool) { if it, ok := pq.m[k]; ok { return it.value, true } - var zero T + var zero V return zero, false } // Remove removes and returns the item with the given key if it exists. -func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) { +func (pq *PriorityQueue[K, V]) Remove(k K) (V, bool) { it, ok := pq.m[k] if !ok { - var zero T + var zero V return zero, false } heap.Remove(pq.h, it.index) @@ -82,7 +82,7 @@ func (pq *PriorityQueue[T, K]) Remove(k K) (T, bool) { } // UpdatePriority updates the priority of an item and reorders the queue. -func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool { +func (pq *PriorityQueue[K, V]) UpdatePriority(k K, v V) bool { if it, ok := pq.m[k]; ok { it.value = v heap.Fix(pq.h, it.index) @@ -92,26 +92,26 @@ func (pq *PriorityQueue[T, K]) UpdatePriority(k K, v T) bool { } // Len returns the number of elements in the queue. -func (pq *PriorityQueue[T, K]) Len() int { +func (pq *PriorityQueue[K, V]) Len() int { return pq.h.Len() } // priorityHeap is the internal heap implementation that satisfies heap.Interface. -type priorityHeap[T any] struct { - less func(T, T) bool - heap []*item[T] - idx map[int]*item[T] // Maps index to item for efficient updates +type priorityHeap[V any] struct { + less func(V, V) bool + heap []*item[V] + idx map[int]*item[V] // Maps index to item for efficient updates } -func (h *priorityHeap[T]) Len() int { +func (h *priorityHeap[V]) Len() int { return len(h.heap) } -func (h *priorityHeap[T]) Less(i, j int) bool { +func (h *priorityHeap[V]) Less(i, j int) bool { return h.less(h.heap[i].value, h.heap[j].value) } -func (h *priorityHeap[T]) Swap(i, j int) { +func (h *priorityHeap[V]) Swap(i, j int) { h.heap[i], h.heap[j] = h.heap[j], h.heap[i] h.heap[i].index = i h.heap[j].index = j @@ -119,14 +119,14 @@ func (h *priorityHeap[T]) Swap(i, j int) { h.idx[j] = h.heap[j] } -func (h *priorityHeap[T]) Push(x any) { - it := x.(*item[T]) +func (h *priorityHeap[V]) Push(x any) { + it := x.(*item[V]) it.index = len(h.heap) h.heap = append(h.heap, it) h.idx[it.index] = it } -func (h *priorityHeap[T]) Pop() any { +func (h *priorityHeap[V]) Pop() any { old := h.heap n := len(old) it := old[n-1] @@ -136,17 +136,17 @@ func (h *priorityHeap[T]) Pop() any { } // CircularBuffer is a generic circular buffer. -type CircularBuffer[T any] struct { - buffer []T +type CircularBuffer[V any] struct { + buffer []V size int head int tail int } // NewCircularBuffer creates a new circular buffer with the given capacity. -func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] { - return &CircularBuffer[T]{ - buffer: make([]T, capacity), +func NewCircularBuffer[V any](capacity int) *CircularBuffer[V] { + return &CircularBuffer[V]{ + buffer: make([]V, capacity), size: 0, head: 0, tail: 0, @@ -154,8 +154,8 @@ func NewCircularBuffer[T any](capacity int) *CircularBuffer[T] { } // Push adds an element to the circular buffer and returns the evicted element if any -func (b *CircularBuffer[T]) Push(v T) (T, bool) { - var evicted T +func (b *CircularBuffer[V]) Push(v V) (V, bool) { + var evicted V hasEvicted := false if b.size == len(b.buffer) { @@ -174,9 +174,9 @@ func (b *CircularBuffer[T]) Push(v T) (T, bool) { } // Pop removes and returns the oldest element from the buffer -func (b *CircularBuffer[T]) Pop() (T, bool) { +func (b *CircularBuffer[V]) Pop() (V, bool) { if b.size == 0 { - var zero T + var zero V return zero, false } @@ -188,6 +188,6 @@ func (b *CircularBuffer[T]) Pop() (T, bool) { } // Len returns the number of elements in the buffer -func (b *CircularBuffer[T]) Len() int { +func (b *CircularBuffer[V]) Len() int { return b.size } diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 16fe2499bef28..bbf9796213131 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -39,7 +39,7 @@ func (j *inProgressJob) Duration() time.Duration { // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { - pending *PriorityQueue[*JobWithPriority[int], string] // Jobs waiting to be processed, ordered by priority + pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID completed *CircularBuffer[*types.Job] // Last N completed jobs statusMap map[string]types.JobStatus // Maps job ID to its current status @@ -49,7 +49,7 @@ type JobQueue struct { // NewJobQueue creates a new job queue instance func NewJobQueue() *JobQueue { return &JobQueue{ - pending: NewPriorityQueue[*JobWithPriority[int]]( + pending: NewPriorityQueue( func(a, b *JobWithPriority[int]) bool { return a.Priority > b.Priority // Higher priority first }, diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 3054331b57115..7f88f0f45dc6b 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -144,15 +144,53 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { for _, job := range jobs { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID - if status, ok := s.queue.Exists(job.Job); ok { - level.Debug(s.logger).Log("msg", "job already exists", "job", job, "status", status) - // TODO: update priority + + logger := log.With( + s.logger, + "job", job.Job.ID, + "priority", job.Priority, + ) + + status, ok := s.queue.Exists(job.Job) + + // scheduler is unaware of incoming job; enqueue + if !ok { + level.Debug(logger).Log( + "msg", "job does not exist, enqueueing", + ) + + // enqueue + if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { + level.Error(logger).Log("msg", "failed to enqueue job", "err", err) + } + continue } - if err := s.queue.Enqueue(job.Job, job.Priority); err != nil { - level.Error(s.logger).Log("msg", "failed to enqueue job", "job", job, "err", err) + // scheduler is aware of incoming job; handling depends on status + switch status { + case types.JobStatusPending: + level.Debug(s.logger).Log( + "msg", "job is pending, updating priority", + "old_priority", job.Priority, + ) + s.queue.pending.UpdatePriority(job.Job.ID, job) + case types.JobStatusInProgress: + level.Debug(s.logger).Log( + "msg", "job is in progress, ignoring", + ) + case types.JobStatusComplete: + // shouldn't happen + level.Debug(s.logger).Log( + "msg", "job is complete, ignoring", + ) + default: + level.Error(s.logger).Log( + "msg", "job has unknown status, ignoring", + "status", status, + ) } + } return nil @@ -178,8 +216,14 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*t func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error { logger := log.With(s.logger, "job", job.ID) + status, exists := s.queue.Exists(job) + if !exists { + level.Error(logger).Log("msg", "cannot complete job, job does not exist") + return nil + } + if !success { - level.Error(logger).Log("msg", "job failed") + level.Error(logger).Log("msg", "job failed, re-enqueuing") return nil } s.queue.MarkComplete(job.ID) diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index 9cf94daebd484..74405b7ad2206 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -10,6 +10,10 @@ type Job struct { Offsets Offsets } +func (j *Job) GetID() string { + return j.ID +} + // JobStatus represents the current state of a job type JobStatus int From fddd01ae903a30c315b12c1799aa32f06809690a Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 13:19:58 +0800 Subject: [PATCH 03/16] fleshing out support for non successful statuses --- pkg/blockbuilder/scheduler/queue.go | 138 +++++++++++++++--------- pkg/blockbuilder/scheduler/scheduler.go | 18 ++-- pkg/blockbuilder/types/job.go | 19 ++++ 3 files changed, 112 insertions(+), 63 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index bbf9796213131..9e443de379888 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -12,36 +12,37 @@ const ( defaultCompletedJobsCapacity = 100 ) -// JobWithPriority wraps a job with a priority value -type JobWithPriority[T comparable] struct { - Job *types.Job - Priority T +// JobWithPriority wraps a job with its priority +type JobWithPriority[P any] struct { + *types.Job + Priority P } -// NewJobWithPriority creates a new JobWithPriority instance -func NewJobWithPriority[T comparable](job *types.Job, priority T) *JobWithPriority[T] { - return &JobWithPriority[T]{ +func NewJobWithPriority[P any](job *types.Job, priority P) *JobWithPriority[P] { + return &JobWithPriority[P]{ Job: job, Priority: priority, } } -// inProgressJob contains a job and its start time -type inProgressJob struct { - job *types.Job - startTime time.Time +// JobWithStatus wraps a job with its completion status and time +type JobWithStatus struct { + *types.Job + Status types.JobStatus + CompletedAt time.Time } -// Duration returns how long the job has been running -func (j *inProgressJob) Duration() time.Duration { - return time.Since(j.startTime) +// inProgressJob tracks a job that is currently being processed +type inProgressJob struct { + *types.Job + StartTime time.Time } // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID - completed *CircularBuffer[*types.Job] // Last N completed jobs + completed *CircularBuffer[*JobWithStatus] // Last N completed jobs with their status statusMap map[string]types.JobStatus // Maps job ID to its current status mu sync.RWMutex } @@ -53,25 +54,24 @@ func NewJobQueue() *JobQueue { func(a, b *JobWithPriority[int]) bool { return a.Priority > b.Priority // Higher priority first }, - func(a *JobWithPriority[int]) string { - return a.Job.ID - }, + func(j *JobWithPriority[int]) string { return j.ID }, ), inProgress: make(map[string]*inProgressJob), - completed: NewCircularBuffer[*types.Job](defaultCompletedJobsCapacity), + completed: NewCircularBuffer[*JobWithStatus](defaultCompletedJobsCapacity), // Keep last 100 completed jobs statusMap: make(map[string]types.JobStatus), } } +// Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() defer q.mu.RUnlock() - status, exists := q.statusMap[job.ID] - return status, exists + status, ok := q.statusMap[job.ID] + return status, ok } -// Enqueue adds a new job to the pending queue with a priority +// Enqueue adds a job to the pending queue with the given priority func (q *JobQueue) Enqueue(job *types.Job, priority int) error { q.mu.Lock() defer q.mu.Unlock() @@ -81,56 +81,88 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { return fmt.Errorf("job %s already exists with status %v", job.ID, status) } - jobWithPriority := NewJobWithPriority(job, priority) - q.pending.Push(jobWithPriority) + q.pending.Push(NewJobWithPriority(job, priority)) q.statusMap[job.ID] = types.JobStatusPending return nil } -// Dequeue gets the next available job and assigns it to a builder -func (q *JobQueue) Dequeue(_ string) (*types.Job, bool, error) { +// Dequeue removes and returns the highest priority job from the pending queue +func (q *JobQueue) Dequeue() (*types.Job, bool) { q.mu.Lock() defer q.mu.Unlock() - if q.pending.Len() == 0 { - return nil, false, nil + job, ok := q.pending.Pop() + if !ok { + return nil, false } - jobWithPriority, ok := q.pending.Pop() - if !ok { - return nil, false, nil + q.inProgress[job.ID] = &inProgressJob{ + Job: job.Job, + StartTime: time.Now(), } + q.statusMap[job.ID] = types.JobStatusInProgress - // Add to in-progress with current time - q.inProgress[jobWithPriority.Job.ID] = &inProgressJob{ - job: jobWithPriority.Job, - startTime: time.Now(), + return job.Job, true +} + +// GetInProgressJob retrieves a job that is currently being processed +func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + if job, ok := q.inProgress[id]; ok { + return job.Job, job.StartTime, true } - q.statusMap[jobWithPriority.Job.ID] = types.JobStatusInProgress + return nil, time.Time{}, false +} - return jobWithPriority.Job, true, nil +// RemoveInProgress removes a job from the in-progress map +func (q *JobQueue) RemoveInProgress(id string) { + q.mu.Lock() + defer q.mu.Unlock() + + delete(q.inProgress, id) } -// MarkComplete moves a job from in-progress to completed -func (q *JobQueue) MarkComplete(jobID string) { +// MarkComplete moves a job from in-progress to completed with the given status +func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { q.mu.Lock() defer q.mu.Unlock() - // Find job in in-progress map - inProgressJob, exists := q.inProgress[jobID] - // if it doesn't exist, it could be previously removed (duplicate job execution) - // or the scheduler may have restarted and not have the job state anymore. - if exists { - // Remove from in-progress - delete(q.inProgress, jobID) + job, ok := q.inProgress[id] + if !ok { + return } - // Add to completed buffer and handle evicted job - if evictedJob, hasEvicted := q.completed.Push(inProgressJob.job); hasEvicted { - // Remove evicted job from status map - delete(q.statusMap, evictedJob.ID) + // Add to completed buffer with status + completedJob := &JobWithStatus{ + Job: job.Job, + Status: status, + CompletedAt: time.Now(), + } + _, _ = q.completed.Push(completedJob) + + // Update status map and clean up + q.statusMap[id] = status + delete(q.inProgress, id) + + // If the job failed, re-enqueue it with its original priority + if status == types.JobStatusFailed { + // Look up the original priority from the pending queue + if origJob, ok := q.pending.Lookup(id); ok { + q.pending.Push(origJob) // Re-add with original priority + q.statusMap[id] = types.JobStatusPending + } } - q.statusMap[jobID] = types.JobStatusComplete +} + +// GetStatus returns the current status of a job +func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) { + q.mu.RLock() + defer q.mu.RUnlock() + + status, ok := q.statusMap[id] + return status, ok } // SyncJob registers a job as in-progress, used for restoring state after scheduler restarts @@ -140,8 +172,8 @@ func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) { // Add directly to in-progress q.inProgress[jobID] = &inProgressJob{ - job: job, - startTime: time.Now(), + Job: job, + StartTime: time.Now(), } q.statusMap[jobID] = types.JobStatusInProgress } diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 7f88f0f45dc6b..08f32b8cc5d69 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -204,29 +204,27 @@ func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) { } } -func (s *BlockScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { +func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() default: - return s.queue.Dequeue(builderID) + job, ok := s.queue.Dequeue() + return job, ok, nil } } func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error { logger := log.With(s.logger, "job", job.ID) - status, exists := s.queue.Exists(job) - if !exists { - level.Error(logger).Log("msg", "cannot complete job, job does not exist") + if success { + level.Info(logger).Log("msg", "job completed successfully") + s.queue.MarkComplete(job.ID, types.JobStatusComplete) return nil } - if !success { - level.Error(logger).Log("msg", "job failed, re-enqueuing") - return nil - } - s.queue.MarkComplete(job.ID) + level.Error(logger).Log("msg", "job failed, re-enqueuing") + s.queue.MarkComplete(job.ID, types.JobStatusFailed) return nil } diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index 74405b7ad2206..d5909c1d4ad68 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -21,8 +21,27 @@ const ( JobStatusPending JobStatus = iota JobStatusInProgress JobStatusComplete + JobStatusFailed // Job failed and may be retried + JobStatusExpired // Job failed too many times or is too old ) +func (s JobStatus) String() string { + switch s { + case JobStatusPending: + return "pending" + case JobStatusInProgress: + return "in_progress" + case JobStatusComplete: + return "complete" + case JobStatusFailed: + return "failed" + case JobStatusExpired: + return "expired" + default: + return "unknown" + } +} + // Offsets represents the range of offsets to process type Offsets struct { Min int64 From 118a9b96765e0f89ba38810d62dc0e18f0c587ab Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 13:29:50 +0800 Subject: [PATCH 04/16] consolidate to `JobWithMetadata` --- pkg/blockbuilder/scheduler/queue.go | 106 ++++++++++---------- pkg/blockbuilder/scheduler/strategy.go | 11 +- pkg/blockbuilder/scheduler/strategy_test.go | 38 ++++--- 3 files changed, 82 insertions(+), 73 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 9e443de379888..5999881d8435e 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -12,38 +12,31 @@ const ( defaultCompletedJobsCapacity = 100 ) -// JobWithPriority wraps a job with its priority -type JobWithPriority[P any] struct { - *types.Job - Priority P -} - -func NewJobWithPriority[P any](job *types.Job, priority P) *JobWithPriority[P] { - return &JobWithPriority[P]{ - Job: job, - Priority: priority, - } -} - -// JobWithStatus wraps a job with its completion status and time -type JobWithStatus struct { +// JobWithMetadata wraps a job with additional metadata for tracking its lifecycle +type JobWithMetadata struct { *types.Job + Priority int Status types.JobStatus - CompletedAt time.Time + StartTime time.Time + UpdateTime time.Time } -// inProgressJob tracks a job that is currently being processed -type inProgressJob struct { - *types.Job - StartTime time.Time +// NewJobWithMetadata creates a new JobWithMetadata instance +func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata { + return &JobWithMetadata{ + Job: job, + Priority: priority, + Status: types.JobStatusPending, + UpdateTime: time.Now(), + } } // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { - pending *PriorityQueue[string, *JobWithPriority[int]] // Jobs waiting to be processed, ordered by priority - inProgress map[string]*inProgressJob // Jobs currently being processed, key is job ID - completed *CircularBuffer[*JobWithStatus] // Last N completed jobs with their status - statusMap map[string]types.JobStatus // Maps job ID to its current status + pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority + inProgress map[string]*JobWithMetadata // Jobs currently being processed + completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs + statusMap map[string]types.JobStatus // Maps job ID to its current status mu sync.RWMutex } @@ -51,13 +44,13 @@ type JobQueue struct { func NewJobQueue() *JobQueue { return &JobQueue{ pending: NewPriorityQueue( - func(a, b *JobWithPriority[int]) bool { + func(a, b *JobWithMetadata) bool { return a.Priority > b.Priority // Higher priority first }, - func(j *JobWithPriority[int]) string { return j.ID }, + func(j *JobWithMetadata) string { return j.ID }, ), - inProgress: make(map[string]*inProgressJob), - completed: NewCircularBuffer[*JobWithStatus](defaultCompletedJobsCapacity), // Keep last 100 completed jobs + inProgress: make(map[string]*JobWithMetadata), + completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity), statusMap: make(map[string]types.JobStatus), } } @@ -81,7 +74,8 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { return fmt.Errorf("job %s already exists with status %v", job.ID, status) } - q.pending.Push(NewJobWithPriority(job, priority)) + jobMeta := NewJobWithMetadata(job, priority) + q.pending.Push(jobMeta) q.statusMap[job.ID] = types.JobStatusPending return nil } @@ -91,18 +85,20 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { q.mu.Lock() defer q.mu.Unlock() - job, ok := q.pending.Pop() + jobMeta, ok := q.pending.Pop() if !ok { return nil, false } - q.inProgress[job.ID] = &inProgressJob{ - Job: job.Job, - StartTime: time.Now(), - } - q.statusMap[job.ID] = types.JobStatusInProgress + // Update metadata for in-progress state + jobMeta.Status = types.JobStatusInProgress + jobMeta.StartTime = time.Now() + jobMeta.UpdateTime = jobMeta.StartTime - return job.Job, true + q.inProgress[jobMeta.ID] = jobMeta + q.statusMap[jobMeta.ID] = types.JobStatusInProgress + + return jobMeta.Job, true } // GetInProgressJob retrieves a job that is currently being processed @@ -110,8 +106,8 @@ func (q *JobQueue) GetInProgressJob(id string) (*types.Job, time.Time, bool) { q.mu.RLock() defer q.mu.RUnlock() - if job, ok := q.inProgress[id]; ok { - return job.Job, job.StartTime, true + if jobMeta, ok := q.inProgress[id]; ok { + return jobMeta.Job, jobMeta.StartTime, true } return nil, time.Time{}, false } @@ -129,18 +125,17 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { q.mu.Lock() defer q.mu.Unlock() - job, ok := q.inProgress[id] + jobMeta, ok := q.inProgress[id] if !ok { return } - // Add to completed buffer with status - completedJob := &JobWithStatus{ - Job: job.Job, - Status: status, - CompletedAt: time.Now(), - } - _, _ = q.completed.Push(completedJob) + // Update metadata for completion + jobMeta.Status = status + jobMeta.UpdateTime = time.Now() + + // Add to completed buffer + _, _ = q.completed.Push(jobMeta) // Update status map and clean up q.statusMap[id] = status @@ -148,11 +143,10 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { // If the job failed, re-enqueue it with its original priority if status == types.JobStatusFailed { - // Look up the original priority from the pending queue - if origJob, ok := q.pending.Lookup(id); ok { - q.pending.Push(origJob) // Re-add with original priority - q.statusMap[id] = types.JobStatusPending - } + // Create new metadata for the re-enqueued job + newJobMeta := NewJobWithMetadata(jobMeta.Job, jobMeta.Priority) + q.pending.Push(newJobMeta) + q.statusMap[id] = types.JobStatusPending } } @@ -171,9 +165,13 @@ func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) { defer q.mu.Unlock() // Add directly to in-progress - q.inProgress[jobID] = &inProgressJob{ - Job: job, - StartTime: time.Now(), + jobMeta := &JobWithMetadata{ + Job: job, + Priority: 0, // Priority is not known in this case + Status: types.JobStatusInProgress, + StartTime: time.Now(), + UpdateTime: time.Now(), } + q.inProgress[jobID] = jobMeta q.statusMap[jobID] = types.JobStatusInProgress } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 75719140a4ea0..b6afa163dc515 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -18,7 +18,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context) ([]*JobWithPriority[int], error) + Plan(ctx context.Context) ([]*JobWithMetadata, error) } const ( @@ -46,14 +46,14 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) { +func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) { offsets, err := p.offsetReader.GroupLag(ctx) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) return nil, err } - jobs := make([]*JobWithPriority[int], 0, len(offsets)) + jobs := make([]*JobWithMetadata, 0, len(offsets)) for _, partitionOffset := range offsets { // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. // no additional validation is needed here @@ -69,11 +69,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], for currentStart := startOffset; currentStart < endOffset; { currentEnd := min(currentStart+p.targetRecordCount, endOffset) - job := NewJobWithPriority( + job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, Max: currentEnd, - }), int(endOffset-currentStart), // priority is remaining records to process + }), + int(endOffset-currentStart), // priority is remaining records to process ) jobs = append(jobs, job) diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 9c7b732fb4e08..6771aaf9868c5 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -19,11 +19,19 @@ func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemb return m.groupLag, nil } +// compareJobs compares two JobWithMetadata instances ignoring UpdateTime +func compareJobs(t *testing.T, expected, actual *JobWithMetadata) { + require.Equal(t, expected.Job, actual.Job) + require.Equal(t, expected.Priority, actual.Priority) + require.Equal(t, expected.Status, actual.Status) + require.Equal(t, expected.StartTime, actual.StartTime) +} + func TestRecordCountPlanner_Plan(t *testing.T) { for _, tc := range []struct { name string recordCount int64 - expectedJobs []*JobWithPriority[int] + expectedJobs []*JobWithMetadata groupLag map[int32]kadm.GroupMemberLag }{ { @@ -40,8 +48,8 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 0, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 101, Max: 150}), 49, // 150-101 ), @@ -61,12 +69,12 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 0, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 101, Max: 151}), 99, // priority is total remaining: 200-101 ), - NewJobWithPriority( + NewJobWithMetadata( types.NewJob(0, types.Offsets{Min: 151, Max: 200}), 49, // priority is total remaining: 200-151 ), @@ -95,19 +103,19 @@ func TestRecordCountPlanner_Plan(t *testing.T) { Partition: 1, }, }, - expectedJobs: []*JobWithPriority[int]{ - NewJobWithPriority( + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( + types.NewJob(0, types.Offsets{Min: 101, Max: 150}), + 49, // priority is total remaining: 150-101 + ), + NewJobWithMetadata( types.NewJob(1, types.Offsets{Min: 201, Max: 301}), 199, // priority is total remaining: 400-201 ), - NewJobWithPriority( + NewJobWithMetadata( types.NewJob(1, types.Offsets{Min: 301, Max: 400}), 99, // priority is total remaining: 400-301 ), - NewJobWithPriority( - types.NewJob(0, types.Offsets{Min: 101, Max: 150}), - 49, // priority is total remaining: 150-101 - ), }, }, { @@ -145,7 +153,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs)) - require.ElementsMatch(t, tc.expectedJobs, jobs) + for i := range tc.expectedJobs { + compareJobs(t, tc.expectedJobs[i], jobs[i]) + } }) } } From 2608b00482fe9451c0217f844a9e5997cda79ee8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 13:32:45 +0800 Subject: [PATCH 05/16] remove old finished jobs from status map to avoid leaks --- pkg/blockbuilder/scheduler/queue.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 5999881d8435e..928ecfc214985 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -15,10 +15,10 @@ const ( // JobWithMetadata wraps a job with additional metadata for tracking its lifecycle type JobWithMetadata struct { *types.Job - Priority int - Status types.JobStatus - StartTime time.Time - UpdateTime time.Time + Priority int + Status types.JobStatus + StartTime time.Time + UpdateTime time.Time } // NewJobWithMetadata creates a new JobWithMetadata instance @@ -135,7 +135,10 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { jobMeta.UpdateTime = time.Now() // Add to completed buffer - _, _ = q.completed.Push(jobMeta) + if old, evicted := q.completed.Push(jobMeta); evicted { + // If the buffer is full, evict the oldest job and remove it from the status map to avoid leaks + delete(q.statusMap, old.ID) + } // Update status map and clean up q.statusMap[id] = status From 02155592b4263b196e52ea40aa6e3fdd67906cc2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 14:01:25 +0800 Subject: [PATCH 06/16] job fields are private to force constructor & accessor usage --- pkg/blockbuilder/builder/builder.go | 16 +++++------ pkg/blockbuilder/scheduler/queue.go | 16 +++++------ pkg/blockbuilder/scheduler/scheduler.go | 12 ++++----- pkg/blockbuilder/scheduler/scheduler_test.go | 6 ++--- pkg/blockbuilder/scheduler/strategy.go | 4 +-- pkg/blockbuilder/types/grpc_transport.go | 14 +++++----- pkg/blockbuilder/types/job.go | 28 +++++++++++++------- 7 files changed, 52 insertions(+), 44 deletions(-) diff --git a/pkg/blockbuilder/builder/builder.go b/pkg/blockbuilder/builder/builder.go index d8784bf568d99..ad981e3183d0e 100644 --- a/pkg/blockbuilder/builder/builder.go +++ b/pkg/blockbuilder/builder/builder.go @@ -250,13 +250,13 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error logger := log.With( i.logger, "worker_id", workerID, - "partition", job.Partition, - "job_min_offset", job.Offsets.Min, - "job_max_offset", job.Offsets.Max, + "partition", job.Partition(), + "job_min_offset", job.Offsets().Min, + "job_max_offset", job.Offsets().Max, ) i.jobsMtx.Lock() - i.inflightJobs[job.ID] = job + i.inflightJobs[job.ID()] = job i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) i.jobsMtx.Unlock() @@ -284,7 +284,7 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error } i.jobsMtx.Lock() - delete(i.inflightJobs, job.ID) + delete(i.inflightJobs, job.ID()) i.metrics.inflightJobs.Set(float64(len(i.inflightJobs))) i.jobsMtx.Unlock() @@ -315,7 +315,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "load records", 1, func(ctx context.Context) error { - lastOffset, err = i.loadRecords(ctx, job.Partition, job.Offsets, inputCh) + lastOffset, err = i.loadRecords(ctx, job.Partition(), job.Offsets(), inputCh) return err }, func(ctx context.Context) error { @@ -323,7 +323,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo "msg", "finished loading records", "ctx_error", ctx.Err(), "last_offset", lastOffset, - "total_records", lastOffset-job.Offsets.Min, + "total_records", lastOffset-job.Offsets().Min, ) close(inputCh) return nil @@ -488,7 +488,7 @@ func (i *BlockBuilder) processJob(ctx context.Context, job *types.Job, logger lo } } - if lastOffset <= job.Offsets.Min { + if lastOffset <= job.Offsets().Min { return lastOffset, nil } diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 928ecfc214985..f82e0b789b221 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -47,7 +47,7 @@ func NewJobQueue() *JobQueue { func(a, b *JobWithMetadata) bool { return a.Priority > b.Priority // Higher priority first }, - func(j *JobWithMetadata) string { return j.ID }, + func(j *JobWithMetadata) string { return j.ID() }, ), inProgress: make(map[string]*JobWithMetadata), completed: NewCircularBuffer[*JobWithMetadata](defaultCompletedJobsCapacity), @@ -60,7 +60,7 @@ func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() defer q.mu.RUnlock() - status, ok := q.statusMap[job.ID] + status, ok := q.statusMap[job.ID()] return status, ok } @@ -70,13 +70,13 @@ func (q *JobQueue) Enqueue(job *types.Job, priority int) error { defer q.mu.Unlock() // Check if job already exists - if status, exists := q.statusMap[job.ID]; exists { - return fmt.Errorf("job %s already exists with status %v", job.ID, status) + if status, exists := q.statusMap[job.ID()]; exists { + return fmt.Errorf("job %s already exists with status %v", job.ID(), status) } jobMeta := NewJobWithMetadata(job, priority) q.pending.Push(jobMeta) - q.statusMap[job.ID] = types.JobStatusPending + q.statusMap[job.ID()] = types.JobStatusPending return nil } @@ -95,8 +95,8 @@ func (q *JobQueue) Dequeue() (*types.Job, bool) { jobMeta.StartTime = time.Now() jobMeta.UpdateTime = jobMeta.StartTime - q.inProgress[jobMeta.ID] = jobMeta - q.statusMap[jobMeta.ID] = types.JobStatusInProgress + q.inProgress[jobMeta.ID()] = jobMeta + q.statusMap[jobMeta.ID()] = types.JobStatusInProgress return jobMeta.Job, true } @@ -137,7 +137,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { // Add to completed buffer if old, evicted := q.completed.Push(jobMeta); evicted { // If the buffer is full, evict the oldest job and remove it from the status map to avoid leaks - delete(q.statusMap, old.ID) + delete(q.statusMap, old.ID()) } // Update status map and clean up diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 08f32b8cc5d69..38a2badcae170 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -147,7 +147,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { logger := log.With( s.logger, - "job", job.Job.ID, + "job", job.Job.ID(), "priority", job.Priority, ) @@ -174,7 +174,7 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { "msg", "job is pending, updating priority", "old_priority", job.Priority, ) - s.queue.pending.UpdatePriority(job.Job.ID, job) + s.queue.pending.UpdatePriority(job.Job.ID(), job) case types.JobStatusInProgress: level.Debug(s.logger).Log( "msg", "job is in progress, ignoring", @@ -215,20 +215,20 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job } func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error { - logger := log.With(s.logger, "job", job.ID) + logger := log.With(s.logger, "job", job.ID()) if success { level.Info(logger).Log("msg", "job completed successfully") - s.queue.MarkComplete(job.ID, types.JobStatusComplete) + s.queue.MarkComplete(job.ID(), types.JobStatusComplete) return nil } level.Error(logger).Log("msg", "job failed, re-enqueuing") - s.queue.MarkComplete(job.ID, types.JobStatusFailed) + s.queue.MarkComplete(job.ID(), types.JobStatusFailed) return nil } func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { - s.queue.SyncJob(job.ID, builderID, job) + s.queue.SyncJob(job.ID(), builderID, job) return nil } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index bc72d985f39b6..00a9fade3932d 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -51,8 +51,8 @@ func TestScheduleAndProcessJob(t *testing.T) { if !ok { t.Fatal("expected to receive job") } - if receivedJob.ID != job.ID { - t.Errorf("got job ID %s, want %s", receivedJob.ID, job.ID) + if receivedJob.ID() != job.ID() { + t.Errorf("got job ID %s, want %s", receivedJob.ID(), job.ID()) } // Builder completes job @@ -124,7 +124,7 @@ func TestMultipleBuilders(t *testing.T) { } // Verify different jobs were assigned - if receivedJob1.ID == receivedJob2.ID { + if receivedJob1.ID() == receivedJob2.ID() { t.Error("builders received same job") } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index b6afa163dc515..d4d14bff0e44a 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -84,8 +84,8 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, erro // Sort jobs by partition then priority sort.Slice(jobs, func(i, j int) bool { - if jobs[i].Job.Partition != jobs[j].Job.Partition { - return jobs[i].Job.Partition < jobs[j].Job.Partition + if jobs[i].Job.Partition() != jobs[j].Job.Partition() { + return jobs[i].Job.Partition() < jobs[j].Job.Partition() } return jobs[i].Priority > jobs[j].Priority }) diff --git a/pkg/blockbuilder/types/grpc_transport.go b/pkg/blockbuilder/types/grpc_transport.go index 3b90ba9f20f06..4d52bdfc7745e 100644 --- a/pkg/blockbuilder/types/grpc_transport.go +++ b/pkg/blockbuilder/types/grpc_transport.go @@ -110,9 +110,9 @@ func protoToJob(p *proto.Job) *Job { return nil } return &Job{ - ID: p.GetId(), - Partition: p.GetPartition(), - Offsets: Offsets{ + id: p.GetId(), + partition: p.GetPartition(), + offsets: Offsets{ Min: p.GetOffsets().GetMin(), Max: p.GetOffsets().GetMax(), }, @@ -125,11 +125,11 @@ func jobToProto(j *Job) *proto.Job { return nil } return &proto.Job{ - Id: j.ID, - Partition: j.Partition, + Id: j.ID(), + Partition: j.Partition(), Offsets: &proto.Offsets{ - Min: j.Offsets.Min, - Max: j.Offsets.Max, + Min: j.offsets.Min, + Max: j.offsets.Max, }, } } diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index d5909c1d4ad68..ef5a2079cacb2 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -4,14 +4,22 @@ import "fmt" // Job represents a block building task. type Job struct { - ID string + id string // Partition and offset information - Partition int32 - Offsets Offsets + partition int32 + offsets Offsets } -func (j *Job) GetID() string { - return j.ID +func (j *Job) ID() string { + return j.id +} + +func (j *Job) Partition() int32 { + return j.partition +} + +func (j *Job) Offsets() Offsets { + return j.offsets } // JobStatus represents the current state of a job @@ -21,8 +29,8 @@ const ( JobStatusPending JobStatus = iota JobStatusInProgress JobStatusComplete - JobStatusFailed // Job failed and may be retried - JobStatusExpired // Job failed too many times or is too old + JobStatusFailed // Job failed and may be retried + JobStatusExpired // Job failed too many times or is too old ) func (s JobStatus) String() string { @@ -51,9 +59,9 @@ type Offsets struct { // NewJob creates a new job with the given partition and offsets func NewJob(partition int32, offsets Offsets) *Job { return &Job{ - ID: GenerateJobID(partition, offsets), - Partition: partition, - Offsets: offsets, + id: GenerateJobID(partition, offsets), + partition: partition, + offsets: offsets, } } From 5576b0c9f33c7d5e5f7a636f31e3057e12982297 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 14:05:37 +0800 Subject: [PATCH 07/16] removes unused builderID from scheduler interfaces --- pkg/blockbuilder/scheduler/queue.go | 2 +- pkg/blockbuilder/scheduler/scheduler.go | 8 ++++---- pkg/blockbuilder/types/interfaces.go | 6 +++--- pkg/blockbuilder/types/scheduler_server.go | 6 +++--- pkg/blockbuilder/types/transport.go | 6 +++--- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index f82e0b789b221..490faef01b00c 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -163,7 +163,7 @@ func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) { } // SyncJob registers a job as in-progress, used for restoring state after scheduler restarts -func (q *JobQueue) SyncJob(jobID string, _ string, job *types.Job) { +func (q *JobQueue) SyncJob(jobID string, job *types.Job) { q.mu.Lock() defer q.mu.Unlock() diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 38a2badcae170..e7a1642cb045c 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -204,7 +204,7 @@ func (s *BlockScheduler) publishLagMetrics(lag map[int32]kadm.GroupMemberLag) { } } -func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job, bool, error) { +func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, error) { select { case <-ctx.Done(): return nil, false, ctx.Err() @@ -214,7 +214,7 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context, _ string) (*types.Job } } -func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *types.Job, success bool) error { +func (s *BlockScheduler) HandleCompleteJob(_ context.Context, job *types.Job, success bool) error { logger := log.With(s.logger, "job", job.ID()) if success { @@ -228,7 +228,7 @@ func (s *BlockScheduler) HandleCompleteJob(_ context.Context, _ string, job *typ return nil } -func (s *BlockScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { - s.queue.SyncJob(job.ID(), builderID, job) +func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error { + s.queue.SyncJob(job.ID(), job) return nil } diff --git a/pkg/blockbuilder/types/interfaces.go b/pkg/blockbuilder/types/interfaces.go index 2144e83878cb5..5ed51b39caf2f 100644 --- a/pkg/blockbuilder/types/interfaces.go +++ b/pkg/blockbuilder/types/interfaces.go @@ -15,11 +15,11 @@ type BuilderTransport interface { // SchedulerHandler defines the business logic for handling builder requests type SchedulerHandler interface { // HandleGetJob processes a request for a new job - HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) + HandleGetJob(ctx context.Context) (*Job, bool, error) // HandleCompleteJob processes a job completion notification - HandleCompleteJob(ctx context.Context, builderID string, job *Job, success bool) error + HandleCompleteJob(ctx context.Context, job *Job, success bool) error // HandleSyncJob processes a job sync request - HandleSyncJob(ctx context.Context, builderID string, job *Job) error + HandleSyncJob(ctx context.Context, job *Job) error } // Request/Response message types diff --git a/pkg/blockbuilder/types/scheduler_server.go b/pkg/blockbuilder/types/scheduler_server.go index c2756903859f2..671687b9d78f7 100644 --- a/pkg/blockbuilder/types/scheduler_server.go +++ b/pkg/blockbuilder/types/scheduler_server.go @@ -21,7 +21,7 @@ func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer { // GetJob implements proto.SchedulerServiceServer func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { - job, ok, err := s.handler.HandleGetJob(ctx, req.BuilderId) + job, ok, err := s.handler.HandleGetJob(ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -39,7 +39,7 @@ func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) // CompleteJob implements proto.SchedulerServiceServer func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJobRequest) (*proto.CompleteJobResponse, error) { - if err := s.handler.HandleCompleteJob(ctx, req.BuilderId, protoToJob(req.Job), req.Success); err != nil { + if err := s.handler.HandleCompleteJob(ctx, protoToJob(req.Job), req.Success); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &proto.CompleteJobResponse{}, nil @@ -47,7 +47,7 @@ func (s *schedulerServer) CompleteJob(ctx context.Context, req *proto.CompleteJo // SyncJob implements proto.SchedulerServiceServer func (s *schedulerServer) SyncJob(ctx context.Context, req *proto.SyncJobRequest) (*proto.SyncJobResponse, error) { - if err := s.handler.HandleSyncJob(ctx, req.BuilderId, protoToJob(req.Job)); err != nil { + if err := s.handler.HandleSyncJob(ctx, protoToJob(req.Job)); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &proto.SyncJobResponse{}, nil diff --git a/pkg/blockbuilder/types/transport.go b/pkg/blockbuilder/types/transport.go index 6f7dc41e394fe..a036a8ac1e233 100644 --- a/pkg/blockbuilder/types/transport.go +++ b/pkg/blockbuilder/types/transport.go @@ -37,7 +37,7 @@ func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport { } func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { - job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID) + job, ok, err := t.scheduler.HandleGetJob(ctx) if err != nil { return nil, err } @@ -48,9 +48,9 @@ func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequ } func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error { - return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job, req.Success) + return t.scheduler.HandleCompleteJob(ctx, req.Job, req.Success) } func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error { - return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job) + return t.scheduler.HandleSyncJob(ctx, req.Job) } From d14f9d63c61a1e5bc11e9444ac96c2b17cdb47a2 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 14:16:01 +0800 Subject: [PATCH 08/16] sync job updates timestamps --- pkg/blockbuilder/scheduler/queue.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 490faef01b00c..4cf1d14f1dd03 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -162,12 +162,22 @@ func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) { return status, ok } -// SyncJob registers a job as in-progress, used for restoring state after scheduler restarts +// SyncJob registers a job as in-progress or updates its UpdateTime if already in progress func (q *JobQueue) SyncJob(jobID string, job *types.Job) { + // Check if job exists and is in progress + if status, exists := q.Exists(job); exists && status == types.JobStatusInProgress { + q.mu.Lock() + if existingJob, ok := q.inProgress[jobID]; ok { + existingJob.UpdateTime = time.Now() + } + q.mu.Unlock() + return + } + q.mu.Lock() defer q.mu.Unlock() - // Add directly to in-progress + // Add new job to in-progress jobMeta := &JobWithMetadata{ Job: job, Priority: 0, // Priority is not known in this case From 67fc8ac9b4b7c6bd27f55276958cc28642d326db Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 16:03:35 +0800 Subject: [PATCH 09/16] circular buffer lookup & metadata exists supporting fns --- .../scheduler/prioritiy_queue_test.go | 51 +++++++++++++++++++ pkg/blockbuilder/scheduler/priority_queue.go | 13 +++++ pkg/blockbuilder/scheduler/queue.go | 28 +++++++++- pkg/blockbuilder/types/job.go | 3 +- 4 files changed, 92 insertions(+), 3 deletions(-) diff --git a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go index 05b8ee07679b8..6845e29dbbb3d 100644 --- a/pkg/blockbuilder/scheduler/prioritiy_queue_test.go +++ b/pkg/blockbuilder/scheduler/prioritiy_queue_test.go @@ -250,3 +250,54 @@ func TestCircularBuffer(t *testing.T) { }) } } + +func TestCircularBufferLookup(t *testing.T) { + t.Run("empty buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + _, ok := cb.Lookup(func(i int) bool { return i == 1 }) + require.False(t, ok) + }) + + t.Run("single element", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + cb.Push(1) + v, ok := cb.Lookup(func(i int) bool { return i == 1 }) + require.True(t, ok) + require.Equal(t, 1, v) + }) + + t.Run("multiple elements", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + for i := 1; i <= 3; i++ { + cb.Push(i) + } + v, ok := cb.Lookup(func(i int) bool { return i == 2 }) + require.True(t, ok) + require.Equal(t, 2, v) + }) + + t.Run("wrapped buffer", func(t *testing.T) { + cb := NewCircularBuffer[int](3) + // Push 5 elements into a buffer of size 3, causing wrap-around + for i := 1; i <= 5; i++ { + cb.Push(i) + } + // Buffer should now contain [4,5,3] with head at index 2 + v, ok := cb.Lookup(func(i int) bool { return i == 4 }) + require.True(t, ok) + require.Equal(t, 4, v) + + // Element that was evicted should not be found + _, ok = cb.Lookup(func(i int) bool { return i == 1 }) + require.False(t, ok) + }) + + t.Run("no match", func(t *testing.T) { + cb := NewCircularBuffer[int](5) + for i := 1; i <= 3; i++ { + cb.Push(i) + } + _, ok := cb.Lookup(func(i int) bool { return i == 99 }) + require.False(t, ok) + }) +} diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 248f8ff2d98b6..913a63ce85e87 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -191,3 +191,16 @@ func (b *CircularBuffer[V]) Pop() (V, bool) { func (b *CircularBuffer[V]) Len() int { return b.size } + +// returns the first element in the buffer that satisfies the given predicate +func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) { + for i := 0; i < b.size; i++ { + idx := (b.head + i) % len(b.buffer) + if f(b.buffer[idx]) { + return b.buffer[idx], true + } + + } + var zero V + return zero, false +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 4cf1d14f1dd03..854351511df29 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -60,8 +60,32 @@ func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() defer q.mu.RUnlock() - status, ok := q.statusMap[job.ID()] - return status, ok + x, ok := q.existsLockLess(job.ID()) + if !ok { + return types.JobStatusUnknown, false + } + return x.Status, ok +} + +func (q *JobQueue) existsLockLess(id string) (*JobWithMetadata, bool) { + status, ok := q.statusMap[id] + if !ok { + return nil, false + } + + switch status { + case types.JobStatusPending: + return q.pending.Lookup(id) + case types.JobStatusInProgress: + res, ok := q.inProgress[id] + return res, ok + case types.JobStatusComplete: + return q.completed.Lookup(func(jwm *JobWithMetadata) bool { + return jwm.ID() == id + }) + default: + return nil, false + } } // Enqueue adds a job to the pending queue with the given priority diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go index ef5a2079cacb2..ca23aa003b96e 100644 --- a/pkg/blockbuilder/types/job.go +++ b/pkg/blockbuilder/types/job.go @@ -26,7 +26,8 @@ func (j *Job) Offsets() Offsets { type JobStatus int const ( - JobStatusPending JobStatus = iota + JobStatusUnknown JobStatus = iota // zero value, largely unused + JobStatusPending JobStatusInProgress JobStatusComplete JobStatusFailed // Job failed and may be retried From e6ed2e4c43bc8e54c2b1fa665a6d833a230281c8 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 16:35:44 +0800 Subject: [PATCH 10/16] jobqueue logging, syncjob, default priority --- pkg/blockbuilder/scheduler/queue.go | 75 ++++++++++++++++++----------- pkg/loki/modules.go | 2 +- 2 files changed, 47 insertions(+), 30 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 854351511df29..0ca9d2dbff209 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -5,10 +5,14 @@ import ( "sync" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) const ( + DefaultPriority = 0 // TODO(owen-d): better determine priority when unknown defaultCompletedJobsCapacity = 100 ) @@ -33,6 +37,7 @@ func NewJobWithMetadata(job *types.Job, priority int) *JobWithMetadata { // JobQueue manages the queue of pending jobs and tracks their state. type JobQueue struct { + logger log.Logger pending *PriorityQueue[string, *JobWithMetadata] // Jobs waiting to be processed, ordered by priority inProgress map[string]*JobWithMetadata // Jobs currently being processed completed *CircularBuffer[*JobWithMetadata] // Last N completed jobs @@ -40,9 +45,9 @@ type JobQueue struct { mu sync.RWMutex } -// NewJobQueue creates a new job queue instance -func NewJobQueue() *JobQueue { +func NewJobQueueWithLogger(logger log.Logger) *JobQueue { return &JobQueue{ + logger: logger, pending: NewPriorityQueue( func(a, b *JobWithMetadata) bool { return a.Priority > b.Priority // Higher priority first @@ -55,6 +60,11 @@ func NewJobQueue() *JobQueue { } } +// NewJobQueue creates a new job queue instance +func NewJobQueue() *JobQueue { + return NewJobQueueWithLogger(log.NewNopLogger()) +} + // Exists checks if a job exists in any state and returns its status func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) { q.mu.RLock() @@ -177,38 +187,45 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { } } -// GetStatus returns the current status of a job -func (q *JobQueue) GetStatus(id string) (types.JobStatus, bool) { - q.mu.RLock() - defer q.mu.RUnlock() - - status, ok := q.statusMap[id] - return status, ok -} - // SyncJob registers a job as in-progress or updates its UpdateTime if already in progress func (q *JobQueue) SyncJob(jobID string, job *types.Job) { - // Check if job exists and is in progress - if status, exists := q.Exists(job); exists && status == types.JobStatusInProgress { - q.mu.Lock() - if existingJob, ok := q.inProgress[jobID]; ok { - existingJob.UpdateTime = time.Now() - } - q.mu.Unlock() - return - } - q.mu.Lock() defer q.mu.Unlock() - // Add new job to in-progress - jobMeta := &JobWithMetadata{ - Job: job, - Priority: 0, // Priority is not known in this case - Status: types.JobStatusInProgress, - StartTime: time.Now(), - UpdateTime: time.Now(), + // Helper function to create a new job + registerInProgress := func() { + // Job does not exist; add it as in-progress + now := time.Now() + jobMeta := NewJobWithMetadata(job, DefaultPriority) + jobMeta.StartTime = now + jobMeta.UpdateTime = now + jobMeta.Status = types.JobStatusInProgress + q.inProgress[jobID] = jobMeta + } + + jobMeta, ok := q.existsLockLess(jobID) + + if !ok { + registerInProgress() + return + } + + switch jobMeta.Status { + case types.JobStatusPending: + // Job already pending, move to in-progress + _, ok := q.pending.Remove(jobID) + if !ok { + level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID) + } + case types.JobStatusInProgress: + case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: + // Job already completed, re-enqueue a new one + registerInProgress() + default: + registerInProgress() } + q.inProgress[jobID] = jobMeta - q.statusMap[jobID] = types.JobStatusInProgress + jobMeta.Status = types.JobStatusInProgress + } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 395a891e62103..e30136c2b8637 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1866,7 +1866,7 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { s := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, - blockscheduler.NewJobQueue(), + blockscheduler.NewJobQueueWithLogger(logger), offsetManager, logger, prometheus.DefaultRegisterer, From d7e90621712aeb8d9bb4d55edb258e4037c1198e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 17:00:20 +0800 Subject: [PATCH 11/16] scheduler job completion --- pkg/blockbuilder/scheduler/queue.go | 37 ++++++++++---------- pkg/blockbuilder/scheduler/scheduler.go | 23 +++++++++--- pkg/blockbuilder/scheduler/scheduler_test.go | 28 ++++++++++++++- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 0ca9d2dbff209..2b9fc3ebcb75d 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -159,31 +159,32 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { q.mu.Lock() defer q.mu.Unlock() - jobMeta, ok := q.inProgress[id] + jobMeta, ok := q.existsLockLess(id) if !ok { + level.Error(q.logger).Log("msg", "failed to mark job as complete", "job", id, "status", status) return } - // Update metadata for completion - jobMeta.Status = status - jobMeta.UpdateTime = time.Now() - - // Add to completed buffer - if old, evicted := q.completed.Push(jobMeta); evicted { - // If the buffer is full, evict the oldest job and remove it from the status map to avoid leaks - delete(q.statusMap, old.ID()) + switch jobMeta.Status { + case types.JobStatusInProgress: + // update & remove from in progress + delete(q.inProgress, id) + case types.JobStatusPending: + _, ok := q.pending.Remove(id) + if !ok { + level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", id) + } + default: + level.Error(q.logger).Log("msg", "unknown job status, cannot mark as complete", "job", id, "status", status) } - // Update status map and clean up - q.statusMap[id] = status - delete(q.inProgress, id) + jobMeta.Status = status + jobMeta.UpdateTime = time.Now() - // If the job failed, re-enqueue it with its original priority - if status == types.JobStatusFailed { - // Create new metadata for the re-enqueued job - newJobMeta := NewJobWithMetadata(jobMeta.Job, jobMeta.Priority) - q.pending.Push(newJobMeta) - q.statusMap[id] = types.JobStatusPending + // add it to the completed buffer, removing any evicted job from the statusMap + removal, evicted := q.completed.Push(jobMeta) + if evicted { + delete(q.statusMap, removal.ID()) } } diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index e7a1642cb045c..5ebc6cc11e3e1 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -214,17 +214,32 @@ func (s *BlockScheduler) HandleGetJob(ctx context.Context) (*types.Job, bool, er } } -func (s *BlockScheduler) HandleCompleteJob(_ context.Context, job *types.Job, success bool) error { +func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, success bool) (err error) { logger := log.With(s.logger, "job", job.ID()) if success { - level.Info(logger).Log("msg", "job completed successfully") - s.queue.MarkComplete(job.ID(), types.JobStatusComplete) - return nil + // TODO(owen-d): do i need to increment offset here? + if err = s.offsetManager.Commit( + ctx, + job.Partition(), + job.Offsets().Max, + ); err == nil { + s.queue.MarkComplete(job.ID(), types.JobStatusComplete) + level.Info(logger).Log("msg", "job completed successfully") + return nil + } + + level.Error(logger).Log("msg", "failed to commit offset", "err", err) } level.Error(logger).Log("msg", "job failed, re-enqueuing") s.queue.MarkComplete(job.ID(), types.JobStatusFailed) + s.queue.pending.Push( + NewJobWithMetadata( + job, + DefaultPriority, + ), + ) return nil } diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 00a9fade3932d..142e46731e87b 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -7,8 +7,10 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" + "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" + "github.com/grafana/loki/v3/pkg/kafka/partition" ) type testEnv struct { @@ -18,9 +20,33 @@ type testEnv struct { builder *Worker } +type mockOffsetManager struct { + topic string + consumerGroup string +} + +func (m *mockOffsetManager) Topic() string { return m.topic } +func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup } +func (m *mockOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) { + return nil, nil +} +func (m *mockOffsetManager) FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) { + return 0, nil +} +func (m *mockOffsetManager) FetchPartitionOffset(ctx context.Context, partition int32, position partition.SpecialOffset) (int64, error) { + return 0, nil +} +func (m *mockOffsetManager) Commit(ctx context.Context, partition int32, offset int64) error { + return nil +} + func newTestEnv(builderID string) *testEnv { queue := NewJobQueue() - scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry()) + mockOffsetMgr := &mockOffsetManager{ + topic: "test-topic", + consumerGroup: "test-group", + } + scheduler := NewScheduler(Config{}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry()) transport := types.NewMemoryTransport(scheduler) builder := NewWorker(builderID, transport) From db1406a3a419625c965f2172c7010e6a05e4bd80 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 17:06:14 +0800 Subject: [PATCH 12/16] linting --- pkg/blockbuilder/scheduler/scheduler_test.go | 8 ++++---- pkg/blockbuilder/types/scheduler_server.go | 2 +- pkg/blockbuilder/types/transport.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 142e46731e87b..f13c6d49485c1 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -27,16 +27,16 @@ type mockOffsetManager struct { func (m *mockOffsetManager) Topic() string { return m.topic } func (m *mockOffsetManager) ConsumerGroup() string { return m.consumerGroup } -func (m *mockOffsetManager) GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error) { +func (m *mockOffsetManager) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) { return nil, nil } -func (m *mockOffsetManager) FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error) { +func (m *mockOffsetManager) FetchLastCommittedOffset(_ context.Context, _ int32) (int64, error) { return 0, nil } -func (m *mockOffsetManager) FetchPartitionOffset(ctx context.Context, partition int32, position partition.SpecialOffset) (int64, error) { +func (m *mockOffsetManager) FetchPartitionOffset(_ context.Context, _ int32, _ partition.SpecialOffset) (int64, error) { return 0, nil } -func (m *mockOffsetManager) Commit(ctx context.Context, partition int32, offset int64) error { +func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { return nil } diff --git a/pkg/blockbuilder/types/scheduler_server.go b/pkg/blockbuilder/types/scheduler_server.go index 671687b9d78f7..a5deaa276d622 100644 --- a/pkg/blockbuilder/types/scheduler_server.go +++ b/pkg/blockbuilder/types/scheduler_server.go @@ -20,7 +20,7 @@ func NewSchedulerServer(handler SchedulerHandler) proto.SchedulerServiceServer { } // GetJob implements proto.SchedulerServiceServer -func (s *schedulerServer) GetJob(ctx context.Context, req *proto.GetJobRequest) (*proto.GetJobResponse, error) { +func (s *schedulerServer) GetJob(ctx context.Context, _ *proto.GetJobRequest) (*proto.GetJobResponse, error) { job, ok, err := s.handler.HandleGetJob(ctx) if err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/blockbuilder/types/transport.go b/pkg/blockbuilder/types/transport.go index a036a8ac1e233..ac8917b854c36 100644 --- a/pkg/blockbuilder/types/transport.go +++ b/pkg/blockbuilder/types/transport.go @@ -36,7 +36,7 @@ func NewMemoryTransport(scheduler SchedulerHandler) *MemoryTransport { } } -func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) { +func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, _ *GetJobRequest) (*GetJobResponse, error) { job, ok, err := t.scheduler.HandleGetJob(ctx) if err != nil { return nil, err From de67588a5af70957e990c986c66364d051d0b9b7 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Tue, 10 Dec 2024 17:12:16 +0800 Subject: [PATCH 13/16] scheduler queue fixes & tests --- pkg/blockbuilder/scheduler/queue.go | 9 +- pkg/blockbuilder/scheduler/queue_test.go | 163 +++++++++++++++++++++++ 2 files changed, 170 insertions(+), 2 deletions(-) create mode 100644 pkg/blockbuilder/scheduler/queue_test.go diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 2b9fc3ebcb75d..1aeb15e8395e5 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -186,6 +186,7 @@ func (q *JobQueue) MarkComplete(id string, status types.JobStatus) { if evicted { delete(q.statusMap, removal.ID()) } + q.statusMap[id] = status } // SyncJob registers a job as in-progress or updates its UpdateTime if already in progress @@ -202,6 +203,7 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { jobMeta.UpdateTime = now jobMeta.Status = types.JobStatusInProgress q.inProgress[jobID] = jobMeta + q.statusMap[jobID] = types.JobStatusInProgress } jobMeta, ok := q.existsLockLess(jobID) @@ -218,15 +220,18 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) { if !ok { level.Error(q.logger).Log("msg", "failed to remove job from pending queue", "job", jobID) } + jobMeta.Status = types.JobStatusInProgress case types.JobStatusInProgress: case types.JobStatusComplete, types.JobStatusFailed, types.JobStatusExpired: // Job already completed, re-enqueue a new one registerInProgress() + return default: registerInProgress() + return } + jobMeta.UpdateTime = time.Now() q.inProgress[jobID] = jobMeta - jobMeta.Status = types.JobStatusInProgress - + q.statusMap[jobID] = types.JobStatusInProgress } diff --git a/pkg/blockbuilder/scheduler/queue_test.go b/pkg/blockbuilder/scheduler/queue_test.go new file mode 100644 index 0000000000000..dfbe07681c62a --- /dev/null +++ b/pkg/blockbuilder/scheduler/queue_test.go @@ -0,0 +1,163 @@ +package scheduler + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +func TestJobQueue_SyncJob(t *testing.T) { + t.Run("non-existent to in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + jobID := job.ID() + + beforeSync := time.Now() + q.SyncJob(jobID, job) + afterSync := time.Now() + + // Verify job is in in-progress map + jobMeta, ok := q.inProgress[jobID] + require.True(t, ok, "job should be in in-progress map") + require.Equal(t, types.JobStatusInProgress, jobMeta.Status) + require.True(t, jobMeta.StartTime.After(beforeSync) || jobMeta.StartTime.Equal(beforeSync)) + require.True(t, jobMeta.StartTime.Before(afterSync) || jobMeta.StartTime.Equal(afterSync)) + }) + + t.Run("pending to in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with pending job + err := q.Enqueue(job, DefaultPriority) + require.NoError(t, err) + + beforeSync := time.Now() + q.SyncJob(job.ID(), job) + afterSync := time.Now() + + // Verify job moved from pending to in-progress + _, ok := q.pending.Lookup(job.ID()) + require.False(t, ok, "job should not be in pending queue") + + jobMeta, ok := q.inProgress[job.ID()] + require.True(t, ok, "job should be in in-progress map") + require.Equal(t, types.JobStatusInProgress, jobMeta.Status) + require.True(t, jobMeta.UpdateTime.After(beforeSync) || jobMeta.UpdateTime.Equal(beforeSync)) + require.True(t, jobMeta.UpdateTime.Before(afterSync) || jobMeta.UpdateTime.Equal(afterSync)) + }) + + t.Run("already in-progress", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // First sync to put in in-progress + q.SyncJob(job.ID(), job) + firstUpdate := q.inProgress[job.ID()].UpdateTime + + time.Sleep(time.Millisecond) // Ensure time difference + beforeSecondSync := time.Now() + q.SyncJob(job.ID(), job) + afterSecondSync := time.Now() + + jobMeta := q.inProgress[job.ID()] + require.True(t, jobMeta.UpdateTime.After(firstUpdate), "UpdateTime should be updated") + require.True(t, jobMeta.UpdateTime.After(beforeSecondSync) || jobMeta.UpdateTime.Equal(beforeSecondSync)) + require.True(t, jobMeta.UpdateTime.Before(afterSecondSync) || jobMeta.UpdateTime.Equal(afterSecondSync)) + }) +} + +func TestJobQueue_MarkComplete(t *testing.T) { + t.Run("in-progress to complete", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with in-progress job + q.SyncJob(job.ID(), job) + + beforeComplete := time.Now() + q.MarkComplete(job.ID(), types.JobStatusComplete) + afterComplete := time.Now() + + // Verify job moved to completed buffer + var foundJob *JobWithMetadata + q.completed.Lookup(func(j *JobWithMetadata) bool { + if j.ID() == job.ID() { + foundJob = j + return true + } + return false + }) + require.NotNil(t, foundJob, "job should be in completed buffer") + require.Equal(t, types.JobStatusComplete, foundJob.Status) + require.True(t, foundJob.UpdateTime.After(beforeComplete) || foundJob.UpdateTime.Equal(beforeComplete)) + require.True(t, foundJob.UpdateTime.Before(afterComplete) || foundJob.UpdateTime.Equal(afterComplete)) + + // Verify removed from in-progress + _, ok := q.inProgress[job.ID()] + require.False(t, ok, "job should not be in in-progress map") + }) + + t.Run("pending to complete", func(t *testing.T) { + q := NewJobQueue() + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + + // Start with pending job + err := q.Enqueue(job, DefaultPriority) + require.NoError(t, err) + + q.MarkComplete(job.ID(), types.JobStatusComplete) + + // Verify job not in pending + _, ok := q.pending.Lookup(job.ID()) + require.False(t, ok, "job should not be in pending queue") + + // Verify job in completed buffer + var foundJob *JobWithMetadata + q.completed.Lookup(func(j *JobWithMetadata) bool { + if j.ID() == job.ID() { + foundJob = j + return true + } + return false + }) + require.NotNil(t, foundJob, "job should be in completed buffer") + require.Equal(t, types.JobStatusComplete, foundJob.Status) + }) + + t.Run("non-existent job", func(t *testing.T) { + q := NewJobQueue() + logger := &testLogger{t: t} + q.logger = logger + + q.MarkComplete("non-existent", types.JobStatusComplete) + // Should log error but not panic + }) + + t.Run("already completed job", func(t *testing.T) { + q := NewJobQueue() + logger := &testLogger{t: t} + q.logger = logger + + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + q.SyncJob(job.ID(), job) + q.MarkComplete(job.ID(), types.JobStatusComplete) + + // Try to complete again + q.MarkComplete(job.ID(), types.JobStatusComplete) + // Should log error but not panic + }) +} + +// testLogger implements log.Logger for testing +type testLogger struct { + t *testing.T +} + +func (l *testLogger) Log(keyvals ...interface{}) error { + l.t.Log(keyvals...) + return nil +} From f096e8b0f1af49d6e0761b52ee63f67923ff9384 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 11 Dec 2024 12:55:25 +0800 Subject: [PATCH 14/16] removes unused priority heap key map --- pkg/blockbuilder/scheduler/priority_queue.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 913a63ce85e87..e43591d99db1c 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -22,7 +22,6 @@ func NewPriorityQueue[K comparable, V any](less func(V, V) bool, key func(V) K) h := &priorityHeap[V]{ less: less, heap: make([]*item[V], 0), - idx: make(map[int]*item[V]), } heap.Init(h) return &PriorityQueue[K, V]{ @@ -100,7 +99,6 @@ func (pq *PriorityQueue[K, V]) Len() int { type priorityHeap[V any] struct { less func(V, V) bool heap []*item[V] - idx map[int]*item[V] // Maps index to item for efficient updates } func (h *priorityHeap[V]) Len() int { @@ -115,15 +113,12 @@ func (h *priorityHeap[V]) Swap(i, j int) { h.heap[i], h.heap[j] = h.heap[j], h.heap[i] h.heap[i].index = i h.heap[j].index = j - h.idx[i] = h.heap[i] - h.idx[j] = h.heap[j] } func (h *priorityHeap[V]) Push(x any) { it := x.(*item[V]) it.index = len(h.heap) h.heap = append(h.heap, it) - h.idx[it.index] = it } func (h *priorityHeap[V]) Pop() any { @@ -131,7 +126,6 @@ func (h *priorityHeap[V]) Pop() any { n := len(old) it := old[n-1] h.heap = old[0 : n-1] - delete(h.idx, it.index) return it } From 497ac91ac626cf19c196de7fcd510ca8fada0793 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 11 Dec 2024 13:02:39 +0800 Subject: [PATCH 15/16] removes duplicate index-setting code in priority queue heap --- pkg/blockbuilder/scheduler/priority_queue.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index e43591d99db1c..86b2c795f2eb2 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -42,8 +42,7 @@ func (pq *PriorityQueue[K, V]) Push(v V) { } // Add new item - idx := pq.h.Len() - it := &item[V]{value: v, index: idx} + it := &item[V]{value: v} pq.m[k] = it heap.Push(pq.h, it) } From c6ed241b168646e63ed9e562d19b461d4445970e Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 11 Dec 2024 13:06:10 +0800 Subject: [PATCH 16/16] commit max-1 in block-scheduler (max offset is exclusive) --- pkg/blockbuilder/scheduler/scheduler.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 5ebc6cc11e3e1..5e55e3123420d 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -218,11 +218,10 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, logger := log.With(s.logger, "job", job.ID()) if success { - // TODO(owen-d): do i need to increment offset here? if err = s.offsetManager.Commit( ctx, job.Partition(), - job.Offsets().Max, + job.Offsets().Max-1, // max is exclusive, so commit max-1 ); err == nil { s.queue.MarkComplete(job.ID(), types.JobStatusComplete) level.Info(logger).Log("msg", "job completed successfully")