Skip to content

Commit

Permalink
fix(block-scheduler): init record planner correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli committed Dec 12, 2024
1 parent b7fe6bf commit 18c9c05
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 26 deletions.
16 changes: 11 additions & 5 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type Config struct {
Interval time.Duration `yaml:"interval"`
LookbackPeriod time.Duration `yaml:"lookback_period"`
Strategy string `yaml:"strategy"`
planner Planner `yaml:"-"` // validated planner
TargetRecordCount int64 `yaml:"target_record_count"`
}

Expand Down Expand Up @@ -74,7 +73,6 @@ func (cfg *Config) Validate() error {
if cfg.TargetRecordCount <= 0 {
return errors.New("target record count must be a non-zero value")
}
cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount)
default:
return fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}
Expand All @@ -96,17 +94,25 @@ type BlockScheduler struct {
}

// NewScheduler creates a new scheduler instance
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) *BlockScheduler {
func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) {
var planner Planner
switch cfg.Strategy {
case RecordCountStrategy:
planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger)
default:
return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy)
}

s := &BlockScheduler{
cfg: cfg,
planner: cfg.planner,
planner: planner,
offsetManager: offsetManager,
logger: logger,
metrics: NewMetrics(r),
queue: queue,
}
s.Service = services.NewBasicService(nil, s.running, nil)
return s
return s, nil
}

func (s *BlockScheduler) running(ctx context.Context) error {
Expand Down
36 changes: 23 additions & 13 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error {
return nil
}

func newTestEnv(builderID string) *testEnv {
func newTestEnv(builderID string) (*testEnv, error) {
queue := NewJobQueue()
mockOffsetMgr := &mockOffsetManager{
topic: "test-topic",
consumerGroup: "test-group",
}
scheduler := NewScheduler(Config{}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
scheduler, err := NewScheduler(Config{Strategy: RecordCountStrategy}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry())
if err != nil {
return nil, err
}

transport := types.NewMemoryTransport(scheduler)
builder := NewWorker(builderID, transport)

Expand All @@ -55,16 +59,20 @@ func newTestEnv(builderID string) *testEnv {
scheduler: scheduler,
transport: transport,
builder: builder,
}
}, err
}

func TestScheduleAndProcessJob(t *testing.T) {
env := newTestEnv("test-builder-1")
env, err := newTestEnv("test-builder-1")
if err != nil {
t.Fatalf("failed to create test environment: %v", err)
}

ctx := context.Background()

// Create and enqueue a test job
job := types.NewJob(1, types.Offsets{Min: 100, Max: 200})
err := env.queue.Enqueue(job, 100)
err = env.queue.Enqueue(job, 100)
if err != nil {
t.Fatalf("failed to enqueue job: %v", err)
}
Expand Down Expand Up @@ -98,21 +106,27 @@ func TestScheduleAndProcessJob(t *testing.T) {
}

func TestContextCancellation(t *testing.T) {
env := newTestEnv("test-builder-1")
env, err := newTestEnv("test-builder-1")
if err != nil {
t.Fatalf("failed to create test environment: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

// Try to get job after context timeout
time.Sleep(20 * time.Millisecond)
_, _, err := env.builder.GetJob(ctx)
_, _, err = env.builder.GetJob(ctx)
if err == nil {
t.Error("expected error from cancelled context")
}
}

func TestMultipleBuilders(t *testing.T) {
// Create first environment
env1 := newTestEnv("test-builder-1")
env1, err := newTestEnv("test-builder-1")
if err != nil {
t.Fatalf("failed to create test environment: %v", err)
}
// Create second builder using same scheduler
builder2 := NewWorker("test-builder-2", env1.transport)

Expand All @@ -123,7 +137,7 @@ func TestMultipleBuilders(t *testing.T) {
job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400})

// Enqueue jobs
err := env1.queue.Enqueue(job1, 100)
err = env1.queue.Enqueue(job1, 100)
if err != nil {
t.Fatalf("failed to enqueue job1: %v", err)
}
Expand Down Expand Up @@ -268,10 +282,6 @@ func TestConfig_Validate(t *testing.T) {
if err != nil {
t.Errorf("Validate() error = %v, wantErr nil", err)
}
// Check that planner is set for valid configs
if tt.cfg.planner == nil {
t.Error("Validate() did not set planner for valid config")
}
})
}
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scheduler
import (
"context"
"sort"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -13,7 +14,7 @@ import (

// OffsetReader is an interface to list offsets for all partitions of a topic from Kafka.
type OffsetReader interface {
GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error)
GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error)
}

type Planner interface {
Expand All @@ -32,13 +33,17 @@ var validStrategies = []string{
// tries to consume upto targetRecordCount records per partition
type RecordCountPlanner struct {
targetRecordCount int64
lookbackPeriod time.Duration
offsetReader OffsetReader
logger log.Logger
}

func NewRecordCountPlanner(targetRecordCount int64) *RecordCountPlanner {
func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner {
return &RecordCountPlanner{
targetRecordCount: targetRecordCount,
lookbackPeriod: lookbackPeriod,
offsetReader: offsetReader,
logger: logger,
}
}

Expand All @@ -47,7 +52,7 @@ func (p *RecordCountPlanner) Name() string {
}

func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) {
offsets, err := p.offsetReader.GroupLag(ctx)
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
return nil, err
Expand Down
7 changes: 3 additions & 4 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"

Expand All @@ -15,7 +16,7 @@ type mockOffsetReader struct {
groupLag map[int32]kadm.GroupMemberLag
}

func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) {
func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) {
return m.groupLag, nil
}

Expand Down Expand Up @@ -145,9 +146,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
TargetRecordCount: tc.recordCount,
}
require.NoError(t, cfg.Validate())

planner := NewRecordCountPlanner(tc.recordCount)
planner.offsetReader = mockReader
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())

jobs, err := planner.Plan(context.Background())
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,13 +1877,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) {
return nil, fmt.Errorf("creating kafka offset manager: %w", err)
}

s := blockscheduler.NewScheduler(
s, err := blockscheduler.NewScheduler(
t.Cfg.BlockScheduler,
blockscheduler.NewJobQueueWithLogger(logger),
offsetManager,
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}

blockprotos.RegisterSchedulerServiceServer(
t.Server.GRPC,
Expand Down

0 comments on commit 18c9c05

Please sign in to comment.