Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(block-scheduler): job tracking & offset commits #15338

Merged
merged 16 commits into from
Dec 11, 2024

Conversation

owen-d
Copy link
Member

@owen-d owen-d commented Dec 10, 2024

Block Builder Scheduler Updates

Key improvements to the scheduler component focused on jobs and state management:

Job Handling

  • Better state transitions with some test coverage
  • Improved job metadata organization

Queue Improvements

  • Added key lookups to priority queue
  • Circular buffer for completed jobs
  • Better timestamp and status handling

Other Changes

  • Cleaned up scheduler interfaces
  • Better transport layer separation

No migration needed - internal changes only. Next up: metrics, docs, and retry policies.

@owen-d owen-d requested a review from a team as a code owner December 10, 2024 09:15
logger := log.With(s.logger, "job", job.ID())

if success {
// TODO(owen-d): do i need to increment offset here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we need to commit job.Offsets().Max-1 given builders only consume upto but not including Max

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good

if err = s.offsetManager.Commit(
ctx,
job.Partition(),
job.Offsets().Max,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next jobs would start with job.Offsets().Max

Suggested change
job.Offsets().Max,
job.Offsets().Max-1,

s.queue.pending.Push(
NewJobWithMetadata(
job,
DefaultPriority,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't failed jobs instead get a higher priority?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but I didn't want to complicate the PR further by adding logic for priority discovery on retries. I think this is best left for a followup PR.

type priorityHeap[V any] struct {
less func(V, V) bool
heap []*item[V]
idx map[int]*item[V] // Maps index to item for efficient updates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we use this for lookups? i don't see any references

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call 👍


// Add new item
idx := pq.h.Len()
it := &item[V]{value: v, index: idx}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a bit of duplication, we update the index of the item here and within the heap's Push() method.

Copy link
Contributor

@ashwanthgoli ashwanthgoli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@ashwanthgoli ashwanthgoli merged commit f2bff77 into grafana:main Dec 11, 2024
58 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants