From 117869fca81cf1bb38e4d788d7601d6957b568e3 Mon Sep 17 00:00:00 2001 From: Ashwanth Date: Fri, 13 Dec 2024 19:00:19 +0530 Subject: [PATCH] fix(block-scheduler): init record planner correctly (#15390) --- pkg/blockbuilder/scheduler/scheduler.go | 16 ++++++--- pkg/blockbuilder/scheduler/scheduler_test.go | 36 +++++++++++++------- pkg/blockbuilder/scheduler/strategy.go | 11 ++++-- pkg/blockbuilder/scheduler/strategy_test.go | 7 ++-- pkg/loki/modules.go | 5 ++- 5 files changed, 49 insertions(+), 26 deletions(-) diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 5e55e3123420d..55aa95459a1d9 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -28,7 +28,6 @@ type Config struct { Interval time.Duration `yaml:"interval"` LookbackPeriod time.Duration `yaml:"lookback_period"` Strategy string `yaml:"strategy"` - planner Planner `yaml:"-"` // validated planner TargetRecordCount int64 `yaml:"target_record_count"` } @@ -74,7 +73,6 @@ func (cfg *Config) Validate() error { if cfg.TargetRecordCount <= 0 { return errors.New("target record count must be a non-zero value") } - cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount) default: return fmt.Errorf("invalid strategy: %s", cfg.Strategy) } @@ -96,17 +94,25 @@ type BlockScheduler struct { } // NewScheduler creates a new scheduler instance -func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) *BlockScheduler { +func NewScheduler(cfg Config, queue *JobQueue, offsetManager partition.OffsetManager, logger log.Logger, r prometheus.Registerer) (*BlockScheduler, error) { + var planner Planner + switch cfg.Strategy { + case RecordCountStrategy: + planner = NewRecordCountPlanner(offsetManager, cfg.TargetRecordCount, cfg.LookbackPeriod, logger) + default: + return nil, fmt.Errorf("invalid strategy: %s", cfg.Strategy) + } + s := &BlockScheduler{ cfg: cfg, - planner: cfg.planner, + planner: planner, offsetManager: offsetManager, logger: logger, metrics: NewMetrics(r), queue: queue, } s.Service = services.NewBasicService(nil, s.running, nil) - return s + return s, nil } func (s *BlockScheduler) running(ctx context.Context) error { diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index f13c6d49485c1..48460f5e39856 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -40,13 +40,17 @@ func (m *mockOffsetManager) Commit(_ context.Context, _ int32, _ int64) error { return nil } -func newTestEnv(builderID string) *testEnv { +func newTestEnv(builderID string) (*testEnv, error) { queue := NewJobQueue() mockOffsetMgr := &mockOffsetManager{ topic: "test-topic", consumerGroup: "test-group", } - scheduler := NewScheduler(Config{}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry()) + scheduler, err := NewScheduler(Config{Strategy: RecordCountStrategy}, queue, mockOffsetMgr, log.NewNopLogger(), prometheus.NewRegistry()) + if err != nil { + return nil, err + } + transport := types.NewMemoryTransport(scheduler) builder := NewWorker(builderID, transport) @@ -55,16 +59,20 @@ func newTestEnv(builderID string) *testEnv { scheduler: scheduler, transport: transport, builder: builder, - } + }, err } func TestScheduleAndProcessJob(t *testing.T) { - env := newTestEnv("test-builder-1") + env, err := newTestEnv("test-builder-1") + if err != nil { + t.Fatalf("failed to create test environment: %v", err) + } + ctx := context.Background() // Create and enqueue a test job job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) - err := env.queue.Enqueue(job, 100) + err = env.queue.Enqueue(job, 100) if err != nil { t.Fatalf("failed to enqueue job: %v", err) } @@ -98,13 +106,16 @@ func TestScheduleAndProcessJob(t *testing.T) { } func TestContextCancellation(t *testing.T) { - env := newTestEnv("test-builder-1") + env, err := newTestEnv("test-builder-1") + if err != nil { + t.Fatalf("failed to create test environment: %v", err) + } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) defer cancel() // Try to get job after context timeout time.Sleep(20 * time.Millisecond) - _, _, err := env.builder.GetJob(ctx) + _, _, err = env.builder.GetJob(ctx) if err == nil { t.Error("expected error from cancelled context") } @@ -112,7 +123,10 @@ func TestContextCancellation(t *testing.T) { func TestMultipleBuilders(t *testing.T) { // Create first environment - env1 := newTestEnv("test-builder-1") + env1, err := newTestEnv("test-builder-1") + if err != nil { + t.Fatalf("failed to create test environment: %v", err) + } // Create second builder using same scheduler builder2 := NewWorker("test-builder-2", env1.transport) @@ -123,7 +137,7 @@ func TestMultipleBuilders(t *testing.T) { job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400}) // Enqueue jobs - err := env1.queue.Enqueue(job1, 100) + err = env1.queue.Enqueue(job1, 100) if err != nil { t.Fatalf("failed to enqueue job1: %v", err) } @@ -268,10 +282,6 @@ func TestConfig_Validate(t *testing.T) { if err != nil { t.Errorf("Validate() error = %v, wantErr nil", err) } - // Check that planner is set for valid configs - if tt.cfg.planner == nil { - t.Error("Validate() did not set planner for valid config") - } }) } } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index d4d14bff0e44a..e710d14767275 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -3,6 +3,7 @@ package scheduler import ( "context" "sort" + "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -13,7 +14,7 @@ import ( // OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. type OffsetReader interface { - GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error) + GroupLag(context.Context, time.Duration) (map[int32]kadm.GroupMemberLag, error) } type Planner interface { @@ -32,13 +33,17 @@ var validStrategies = []string{ // tries to consume upto targetRecordCount records per partition type RecordCountPlanner struct { targetRecordCount int64 + lookbackPeriod time.Duration offsetReader OffsetReader logger log.Logger } -func NewRecordCountPlanner(targetRecordCount int64) *RecordCountPlanner { +func NewRecordCountPlanner(offsetReader OffsetReader, targetRecordCount int64, lookbackPeriod time.Duration, logger log.Logger) *RecordCountPlanner { return &RecordCountPlanner{ targetRecordCount: targetRecordCount, + lookbackPeriod: lookbackPeriod, + offsetReader: offsetReader, + logger: logger, } } @@ -47,7 +52,7 @@ func (p *RecordCountPlanner) Name() string { } func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithMetadata, error) { - offsets, err := p.offsetReader.GroupLag(ctx) + offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) return nil, err diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 6771aaf9868c5..7f6dedcb4abe6 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kadm" @@ -15,7 +16,7 @@ type mockOffsetReader struct { groupLag map[int32]kadm.GroupMemberLag } -func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) { +func (m *mockOffsetReader) GroupLag(_ context.Context, _ time.Duration) (map[int32]kadm.GroupMemberLag, error) { return m.groupLag, nil } @@ -145,9 +146,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { TargetRecordCount: tc.recordCount, } require.NoError(t, cfg.Validate()) - - planner := NewRecordCountPlanner(tc.recordCount) - planner.offsetReader = mockReader + planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) jobs, err := planner.Plan(context.Background()) require.NoError(t, err) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 31393d874a2f4..49a26498b8e31 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -1877,13 +1877,16 @@ func (t *Loki) initBlockScheduler() (services.Service, error) { return nil, fmt.Errorf("creating kafka offset manager: %w", err) } - s := blockscheduler.NewScheduler( + s, err := blockscheduler.NewScheduler( t.Cfg.BlockScheduler, blockscheduler.NewJobQueueWithLogger(logger), offsetManager, logger, prometheus.DefaultRegisterer, ) + if err != nil { + return nil, err + } blockprotos.RegisterSchedulerServiceServer( t.Server.GRPC,