Skip to content

Commit

Permalink
chore: add status page for block scheduler (#15553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli authored Dec 27, 2024
1 parent 52d745f commit a3a5acd
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 0 deletions.
13 changes: 13 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ func (pq *PriorityQueue[K, V]) Len() int {
return pq.h.Len()
}

// List returns all elements in the queue.
func (pq *PriorityQueue[K, V]) List() []V {
return pq.h.List()
}

// priorityHeap is the internal heap implementation that satisfies heap.Interface.
type priorityHeap[V any] struct {
less func(V, V) bool
Expand All @@ -108,6 +113,14 @@ func (h *priorityHeap[V]) Less(i, j int) bool {
return h.less(h.heap[i].value, h.heap[j].value)
}

func (h *priorityHeap[V]) List() []V {
vals := make([]V, 0, len(h.heap))
for _, item := range h.heap {
vals = append(vals, item.value)
}
return vals
}

func (h *priorityHeap[V]) Swap(i, j int) {
h.heap[i], h.heap[j] = h.heap[j], h.heap[i]
h.heap[i].index = i
Expand Down
39 changes: 39 additions & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,42 @@ func (q *JobQueue) SyncJob(jobID string, job *types.Job) {
q.inProgress[jobID] = jobMeta
q.statusMap[jobID] = types.JobStatusInProgress
}

func (q *JobQueue) ListPendingJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()

// return copies of the jobs since they can change after the lock is released
jobs := make([]JobWithMetadata, 0, q.pending.Len())
for _, j := range q.pending.List() {
jobs = append(jobs, JobWithMetadata{
// Job is immutable, no need to make a copy
Job: j.Job,
Priority: j.Priority,
Status: j.Status,
StartTime: j.StartTime,
UpdateTime: j.UpdateTime,
})
}

return jobs
}

func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()

// return copies of the jobs since they can change after the lock is released
jobs := make([]JobWithMetadata, 0, len(q.inProgress))
for _, j := range q.inProgress {
jobs = append(jobs, JobWithMetadata{
// Job is immutable, no need to make a copy
Job: j.Job,
Priority: j.Priority,
Status: j.Status,
StartTime: j.StartTime,
UpdateTime: j.UpdateTime,
})
}
return jobs
}
5 changes: 5 additions & 0 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"net/http"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -259,3 +260,7 @@ func (s *BlockScheduler) HandleSyncJob(_ context.Context, job *types.Job) error
s.queue.SyncJob(job.ID(), job)
return nil
}

func (s *BlockScheduler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
newStatusPageHandler(s.queue, s.offsetManager, s.cfg.LookbackPeriod).ServeHTTP(w, req)
}
93 changes: 93 additions & 0 deletions pkg/blockbuilder/scheduler/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package scheduler

import (
"context"
_ "embed"
"html/template"
"net/http"
"slices"
"time"

"github.com/twmb/franz-go/pkg/kadm"
)

//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"durationSince": func(t time.Time) string { return time.Since(t).Truncate(time.Second).String() },
}).Parse(defaultPageContent))

type jobQueue interface {
ListPendingJobs() []JobWithMetadata
ListInProgressJobs() []JobWithMetadata
}

type offsetReader interface {
GroupLag(ctx context.Context, lookbackPeriod time.Duration) (map[int32]kadm.GroupMemberLag, error)
}

type partitionInfo struct {
Partition int32
Lag int64
EndOffset int64
CommittedOffset int64
}

type statusPageHandler struct {
jobQueue jobQueue
offsetReader offsetReader
lookbackPeriod time.Duration
}

func newStatusPageHandler(jobQueue jobQueue, offsetReader offsetReader, lookbackPeriod time.Duration) *statusPageHandler {
return &statusPageHandler{jobQueue: jobQueue, offsetReader: offsetReader, lookbackPeriod: lookbackPeriod}
}

func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
offsets, err := h.offsetReader.GroupLag(context.Background(), h.lookbackPeriod)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

pendingJobs := h.jobQueue.ListPendingJobs()
slices.SortFunc(pendingJobs, func(a, b JobWithMetadata) int {
return b.Priority - a.Priority // Higher priority first
})

inProgressJobs := h.jobQueue.ListInProgressJobs()
slices.SortFunc(inProgressJobs, func(a, b JobWithMetadata) int {
return int(a.StartTime.Sub(b.StartTime)) // Earlier start time First
})

data := struct {
PendingJobs []JobWithMetadata
InProgressJobs []JobWithMetadata
Now time.Time
PartitionInfo []partitionInfo
}{
Now: time.Now(),
PendingJobs: pendingJobs,
InProgressJobs: inProgressJobs,
}

for _, partitionOffset := range offsets {
// only include partitions having lag
if partitionOffset.Lag > 0 {
data.PartitionInfo = append(data.PartitionInfo, partitionInfo{
Partition: partitionOffset.Partition,
Lag: partitionOffset.Lag,
EndOffset: partitionOffset.End.Offset,
CommittedOffset: partitionOffset.Commit.At,
})
}
}
slices.SortFunc(data.PartitionInfo, func(a, b partitionInfo) int {
return int(a.Partition - b.Partition)
})

w.Header().Set("Content-Type", "text/html")
if err := defaultPageTemplate.Execute(w, data); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
82 changes: 82 additions & 0 deletions pkg/blockbuilder/scheduler/status.gohtml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
{{- /*gotype: github.com/grafana/dskit/ring.httpResponse */ -}}
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Block Scheduler Status</title>
</head>
<body>
<h1>Block Scheduler Status</h1>
<p>Current time: {{ .Now }}</p>
<h2>Pending Jobs</h2>
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Priority</th>
<th>Partition</th>
<th>Start Offset</th>
<th>End Offset</th>
<th>Creation Timestamp</th>
</tr>
</thead>
<tbody>
{{ range $i, $job := .PendingJobs }}
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
<td>{{ .Offsets.Min }}</td>
<td>{{ .Offsets.Max }}</td>
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
</tr>
{{ end }}
</tbody>
</table>
<h2>In progress Jobs</h2>
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Priority</th>
<th>Partition</th>
<th>Start Offset</th>
<th>End Offset</th>
<th>Start Timestamp</th>
<th>Last Updated Timestamp</th>
</tr>
</thead>
<tbody>
{{ range $i, $job := .InProgressJobs }}
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
<td>{{ .Offsets.Min }}</td>
<td>{{ .Offsets.Max }}</td>
<td>{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
</tr>
{{ end }}
</tbody>
</table>
<h3>Partition Lag</h2>
<table width="100%" border="1">
<thead>
<tr>
<th>Partition</th>
<th>Lag</th>
<th>End offset</th>
<th>Committed offset</th>
</tr>
</thead>
<tbody>
{{ range .PartitionInfo }}
<td>{{ .Partition }}</td>
<td>{{ .Lag }}</td>
<td>{{ .EndOffset }}</td>
<td>{{ .CommittedOffset }}</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>
91 changes: 91 additions & 0 deletions pkg/blockbuilder/scheduler/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package scheduler

import (
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

type mockQueueLister struct {
pendingJobs []JobWithMetadata
inProgressJobs []JobWithMetadata
}

func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata {
return m.pendingJobs
}

func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata {
return m.inProgressJobs
}

func TestStatusPageHandler_ServeHTTP(t *testing.T) {
t.Skip("skipping. only added to inspect the generated status page.")

// Setup mock data
mockLister := &mockQueueLister{
pendingJobs: []JobWithMetadata{
{Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: time.Now().Add(-2 * time.Hour), Priority: 23},
{Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: time.Now().Add(-1 * time.Hour), Priority: 42},
{Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: time.Now().Add(-1 * time.Hour), Priority: 11},
},
inProgressJobs: []JobWithMetadata{
{Job: types.NewJob(0, types.Offsets{Min: 1, Max: 10}), StartTime: time.Now().Add(-4 * time.Hour), UpdateTime: time.Now().Add(-3 * time.Hour)},
{Job: types.NewJob(1, types.Offsets{Min: 11, Max: 110}), StartTime: time.Now().Add(-5 * time.Hour), UpdateTime: time.Now().Add(-4 * time.Hour)},
},
}

mockReader := &mockOffsetReader{
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Lag: 10,
Partition: 3,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 90},
},
1: {
Lag: 0,
Partition: 1,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 100},
},
2: {
Lag: 233,
Partition: 2,
End: kadm.ListedOffset{Offset: 333},
Commit: kadm.Offset{At: 100},
},
},
}

handler := newStatusPageHandler(mockLister, mockReader, time.Hour)
req := httptest.NewRequest(http.MethodGet, "/test", nil)
w := httptest.NewRecorder()
handler.ServeHTTP(w, req)

resp := w.Result()
defer resp.Body.Close()

// Verify status code
if resp.StatusCode != http.StatusOK {
t.Errorf("expected status OK; got %v", resp.StatusCode)
}

// Verify content type
contentType := resp.Header.Get("Content-Type")
if contentType != "text/html" {
t.Errorf("expected Content-Type text/html; got %v", contentType)
}

// Write response body to file for inspection
err := os.WriteFile("/tmp/generated_status.html", w.Body.Bytes(), 0644)
if err != nil {
t.Errorf("failed to write response body to file: %v", err)
}
}
2 changes: 2 additions & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,8 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
return nil, err
}

t.Server.HTTP.Path("/blockscheduler/status").Methods("GET").Handler(s)

blockprotos.RegisterSchedulerServiceServer(
t.Server.GRPC,
blocktypes.NewSchedulerServer(s),
Expand Down

0 comments on commit a3a5acd

Please sign in to comment.