Skip to content

Commit

Permalink
feat(block-scheduler): adds service and basic planner support for sch…
Browse files Browse the repository at this point in the history
…eduler (#15200)
  • Loading branch information
ashwanthgoli authored Dec 3, 2024
1 parent f598389 commit ad322c0
Show file tree
Hide file tree
Showing 12 changed files with 834 additions and 20 deletions.
20 changes: 20 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,26 @@ block_builder:
# CLI flag: -blockbuilder.backoff..backoff-retries
[max_retries: <int> | default = 10]

block_scheduler:
# Consumer group used by block scheduler to track the last consumed offset.
# CLI flag: -block-scheduler.consumer-group
[consumer_group: <string> | default = "block-scheduler"]

# How often the scheduler should plan jobs.
# CLI flag: -block-scheduler.interval
[interval: <duration> | default = 5m]

# Period used by the planner to calculate the start and end offset such that
# each job consumes records spanning the target period.
# CLI flag: -block-scheduler.target-records-spanning-period
[target_records_spanning_period: <duration> | default = 1h]

# Lookback period in milliseconds used by the scheduler to plan jobs when the
# consumer group has no commits. -1 consumes from the latest offset. -2
# consumes from the start of the partition.
# CLI flag: -block-scheduler.lookback-period
[lookback_period: <int> | default = -2]

pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled
Expand Down
80 changes: 80 additions & 0 deletions pkg/blockbuilder/scheduler/kafkautil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"errors"
"fmt"
"sync"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
)

// GetGroupLag is similar to `kadm.Client.Lag` but works when the group doesn't have live participants.
// Similar to `kadm.CalculateGroupLagWithStartOffsets`, it takes into account that the group may not have any commits.
//
// The lag is the difference between the last produced offset (high watermark) and an offset in the "past".
// If the block builder committed an offset for a given partition to the consumer group at least once, then
// the lag is the difference between the last produced offset and the offset committed in the consumer group.
// Otherwise, if the block builder didn't commit an offset for a given partition yet (e.g. block builder is
// running for the first time), then the lag is the difference between the last produced offset and fallbackOffsetMillis.
func GetGroupLag(ctx context.Context, admClient *kadm.Client, topic, group string, fallbackOffsetMillis int64) (kadm.GroupLag, error) {
offsets, err := admClient.FetchOffsets(ctx, group)
if err != nil {
if !errors.Is(err, kerr.GroupIDNotFound) {
return nil, fmt.Errorf("fetch offsets: %w", err)
}
}
if err := offsets.Error(); err != nil {
return nil, fmt.Errorf("fetch offsets got error in response: %w", err)
}

startOffsets, err := admClient.ListStartOffsets(ctx, topic)
if err != nil {
return nil, err
}
endOffsets, err := admClient.ListEndOffsets(ctx, topic)
if err != nil {
return nil, err
}

resolveFallbackOffsets := sync.OnceValues(func() (kadm.ListedOffsets, error) {
return admClient.ListOffsetsAfterMilli(ctx, fallbackOffsetMillis, topic)
})
// If the group-partition in offsets doesn't have a commit, fall back depending on where fallbackOffsetMillis points at.
for topic, pt := range startOffsets.Offsets() {
for partition, startOffset := range pt {
if _, ok := offsets.Lookup(topic, partition); ok {
continue
}
fallbackOffsets, err := resolveFallbackOffsets()
if err != nil {
return nil, fmt.Errorf("resolve fallback offsets: %w", err)
}
o, ok := fallbackOffsets.Lookup(topic, partition)
if !ok {
return nil, fmt.Errorf("partition %d not found in fallback offsets for topic %s", partition, topic)
}
if o.Offset < startOffset.At {
// Skip the resolved fallback offset if it's before the partition's start offset (i.e. before the earliest offset of the partition).
// This should not happen in Kafka, but can happen in Kafka-compatible systems, e.g. Warpstream.
continue
}
offsets.Add(kadm.OffsetResponse{Offset: kadm.Offset{
Topic: o.Topic,
Partition: o.Partition,
At: o.Offset,
LeaderEpoch: o.LeaderEpoch,
}})
}
}

descrGroup := kadm.DescribedGroup{
// "Empty" is the state that indicates that the group doesn't have active consumer members; this is always the case for block-builder,
// because we don't use group consumption.
State: "Empty",
}
return kadm.CalculateGroupLagWithStartOffsets(descrGroup, offsets, startOffsets, endOffsets), nil
}
164 changes: 164 additions & 0 deletions pkg/blockbuilder/scheduler/kafkautil_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// SPDX-License-Identifier: AGPL-3.0-only

package scheduler

import (
"context"
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"

"github.com/grafana/loki/v3/pkg/kafka/testkafka"
)

const (
testTopic = "test"
testGroup = "testgroup"
)

func TestKafkaGetGroupLag(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

_, addr := testkafka.CreateClusterWithoutCustomConsumerGroupsSupport(t, 3, testTopic)
kafkaClient := mustKafkaClient(t, addr)
admClient := kadm.NewClient(kafkaClient)

const numRecords = 5

var producedRecords []kgo.Record
kafkaTime := time.Now().Add(-12 * time.Hour)
for i := int64(0); i < numRecords; i++ {
kafkaTime = kafkaTime.Add(time.Minute)

// Produce and keep records to partition 0.
res := produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 0, []byte(`test value`))
rec, err := res.First()
require.NoError(t, err)
require.NotNil(t, rec)

producedRecords = append(producedRecords, *rec)

// Produce same records to partition 1 (this partition won't have any commits).
produceRecords(ctx, t, kafkaClient, kafkaTime, "1", testTopic, 1, []byte(`test value`))
}
require.Len(t, producedRecords, numRecords)

// Commit last produced record from partition 0.
rec := producedRecords[len(producedRecords)-1]
offsets := make(kadm.Offsets)
offsets.Add(kadm.Offset{
Topic: rec.Topic,
Partition: rec.Partition,
At: rec.Offset + 1,
LeaderEpoch: rec.LeaderEpoch,
})
err := admClient.CommitAllOffsets(ctx, testGroup, offsets)
require.NoError(t, err)

// Truncate partition 1 after second to last record to emulate the retention
// Note Kafka sets partition's start offset to the requested offset. Any records within the segment before the requested offset can no longer be read.
// Note the difference between DeleteRecords and DeleteOffsets in kadm docs.
deleteRecOffsets := make(kadm.Offsets)
deleteRecOffsets.Add(kadm.Offset{
Topic: testTopic,
Partition: 1,
At: numRecords - 2,
})
_, err = admClient.DeleteRecords(ctx, deleteRecOffsets)
require.NoError(t, err)

getTopicPartitionLag := func(t *testing.T, lag kadm.GroupLag, topic string, part int32) int64 {
l, ok := lag.Lookup(topic, part)
require.True(t, ok)
return l.Lag
}

t.Run("fallbackOffset=milliseconds", func(t *testing.T) {
// get the timestamp of the last produced record
rec := producedRecords[len(producedRecords)-1]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 1, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to known record and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("fallbackOffset=before-earliest", func(t *testing.T) {
// get the timestamp of third to last produced record (record before earliest in partition 1)
rec := producedRecords[len(producedRecords)-3]
fallbackOffset := rec.Timestamp.Add(-time.Millisecond).UnixMilli()
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, fallbackOffset)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("fallbackOffset=0", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, testGroup, 0)
require.NoError(t, err)

require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 0), "partition 0 must have no lag")
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})

t.Run("group=unknown", func(t *testing.T) {
groupLag, err := GetGroupLag(ctx, admClient, testTopic, "unknown", 0)
require.NoError(t, err)

// This group doesn't have any commits, so it must calc its lag from the fallback.
require.EqualValues(t, numRecords, getTopicPartitionLag(t, groupLag, testTopic, 0))
require.EqualValues(t, 2, getTopicPartitionLag(t, groupLag, testTopic, 1), "partition 1 must fall back to the earliest and get its lag from there")
require.EqualValues(t, 0, getTopicPartitionLag(t, groupLag, testTopic, 2), "partition 2 has no data and must have no lag")
})
}

func mustKafkaClient(t *testing.T, addrs ...string) *kgo.Client {
writeClient, err := kgo.NewClient(
kgo.SeedBrokers(addrs...),
kgo.AllowAutoTopicCreation(),
// We will choose the partition of each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
)
require.NoError(t, err)
t.Cleanup(writeClient.Close)
return writeClient
}

func produceRecords(
ctx context.Context,
t *testing.T,
kafkaClient *kgo.Client,
ts time.Time,
userID string,
topic string,
part int32,
val []byte,
) kgo.ProduceResults {
rec := &kgo.Record{
Timestamp: ts,
Key: []byte(userID),
Value: val,
Topic: topic,
Partition: part, // samples in this batch are split between N partitions
}
produceResult := kafkaClient.ProduceSync(ctx, rec)
require.NoError(t, produceResult.FirstErr())
return produceResult
}

func commitOffset(ctx context.Context, t *testing.T, kafkaClient *kgo.Client, group string, offset kadm.Offset) {
offsets := make(kadm.Offsets)
offsets.Add(offset)
err := kadm.NewClient(kafkaClient).CommitAllOffsets(ctx, group, offsets)
require.NoError(t, err)
}
24 changes: 24 additions & 0 deletions pkg/blockbuilder/scheduler/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package scheduler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Metrics struct {
lag *prometheus.GaugeVec
committedOffset *prometheus.GaugeVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
lag: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_block_scheduler_group_lag",
Help: "How far behind the block scheduler consumer group is from the latest offset.",
}, []string{"partition"}),
committedOffset: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "loki_block_scheduler_group_committed_offset",
Help: "The current offset the block scheduler consumer group is at.",
}, []string{"partition"}),
}
}
62 changes: 62 additions & 0 deletions pkg/blockbuilder/scheduler/offsets_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package scheduler

import (
"context"
"errors"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
)

type offsetReader struct {
topic string
consumerGroup string
fallbackOffsetMillis int64

adminClient *kadm.Client
}

func NewOffsetReader(topic, consumerGroup string, lookbackPeriodInMs int64, client *kgo.Client) OffsetReader {
var fallbackOffsetMillis int64
if lookbackPeriodInMs >= 0 {
fallbackOffsetMillis = time.Now().UnixMilli() - lookbackPeriodInMs
} else {
fallbackOffsetMillis = lookbackPeriodInMs
}

return &offsetReader{
topic: topic,
consumerGroup: consumerGroup,
adminClient: kadm.NewClient(client),
fallbackOffsetMillis: fallbackOffsetMillis,
}
}

func (r *offsetReader) GroupLag(ctx context.Context) (map[int32]kadm.GroupMemberLag, error) {
lag, err := GetGroupLag(ctx, r.adminClient, r.topic, r.consumerGroup, r.fallbackOffsetMillis)
if err != nil {
return nil, err
}

offsets, ok := lag[r.topic]
if !ok {
return nil, errors.New("no lag found for the topic")
}

return offsets, nil
}

func (r *offsetReader) ListOffsetsAfterMilli(ctx context.Context, ts int64) (map[int32]kadm.ListedOffset, error) {
offsets, err := r.adminClient.ListOffsetsAfterMilli(ctx, ts, r.topic)
if err != nil {
return nil, err
}

resp, ok := offsets[r.topic]
if !ok {
return nil, errors.New("no offsets found for the topic")
}

return resp, nil
}
19 changes: 19 additions & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,25 @@ func NewJobQueue() *JobQueue {
}
}

func (q *JobQueue) Exists(job *types.Job) (types.JobStatus, bool) {
q.mu.RLock()
defer q.mu.RUnlock()

if _, ok := q.inProgress[job.ID]; ok {
return types.JobStatusInProgress, true
}

if _, ok := q.pending[job.ID]; ok {
return types.JobStatusPending, true
}

if _, ok := q.completed[job.ID]; ok {
return types.JobStatusComplete, true
}

return -1, false
}

// Enqueue adds a new job to the pending queue
// This is a naive implementation, intended to be refactored
func (q *JobQueue) Enqueue(job *types.Job) error {
Expand Down
Loading

0 comments on commit ad322c0

Please sign in to comment.