From 532bdbc15643445bcd770d8d45df8b25b2c926d3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 4 Dec 2024 03:29:41 -0800 Subject: [PATCH] feat(blockbuilder): consolidate on record counting planner (#15247) --- docs/sources/shared/configuration.md | 14 +- pkg/blockbuilder/scheduler/scheduler.go | 44 +++++- pkg/blockbuilder/scheduler/scheduler_test.go | 101 +++++++++++++ pkg/blockbuilder/scheduler/strategy.go | 112 ++++----------- pkg/blockbuilder/scheduler/strategy_test.go | 140 ++++++++++--------- 5 files changed, 247 insertions(+), 164 deletions(-) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 8cc10f9b9be3f..1160adff8f8ba 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -212,17 +212,21 @@ block_scheduler: # CLI flag: -block-scheduler.interval [interval: | default = 5m] - # Period used by the planner to calculate the start and end offset such that - # each job consumes records spanning the target period. - # CLI flag: -block-scheduler.target-records-spanning-period - [target_records_spanning_period: | default = 1h] - # Lookback period in milliseconds used by the scheduler to plan jobs when the # consumer group has no commits. -1 consumes from the latest offset. -2 # consumes from the start of the partition. # CLI flag: -block-scheduler.lookback-period [lookback_period: | default = -2] + # Strategy used by the planner to plan jobs. One of record-count + # CLI flag: -block-scheduler.strategy + [strategy: | default = "record-count"] + + # Target record count used by the planner to plan jobs. Only used when + # strategy is record-count + # CLI flag: -block-scheduler.target-record-count + [target_record_count: | default = 1000] + pattern_ingester: # Whether the pattern ingester is enabled. # CLI flag: -pattern-ingester.enabled diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 96356515a921f..632e6842993a5 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -4,7 +4,9 @@ import ( "context" "errors" "flag" + "fmt" "strconv" + "strings" "time" "github.com/go-kit/log" @@ -22,17 +24,36 @@ var ( ) type Config struct { - ConsumerGroup string `yaml:"consumer_group"` - Interval time.Duration `yaml:"interval"` - TargetRecordConsumptionPeriod time.Duration `yaml:"target_records_spanning_period"` - LookbackPeriod int64 `yaml:"lookback_period"` + ConsumerGroup string `yaml:"consumer_group"` + Interval time.Duration `yaml:"interval"` + LookbackPeriod int64 `yaml:"lookback_period"` + Strategy string `yaml:"strategy"` + planner Planner `yaml:"-"` // validated planner + TargetRecordCount int64 `yaml:"target_record_count"` } func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.Interval, prefix+"interval", 5*time.Minute, "How often the scheduler should plan jobs.") - f.DurationVar(&cfg.TargetRecordConsumptionPeriod, prefix+"target-records-spanning-period", time.Hour, "Period used by the planner to calculate the start and end offset such that each job consumes records spanning the target period.") f.StringVar(&cfg.ConsumerGroup, prefix+"consumer-group", "block-scheduler", "Consumer group used by block scheduler to track the last consumed offset.") f.Int64Var(&cfg.LookbackPeriod, prefix+"lookback-period", -2, "Lookback period in milliseconds used by the scheduler to plan jobs when the consumer group has no commits. -1 consumes from the latest offset. -2 consumes from the start of the partition.") + f.StringVar( + &cfg.Strategy, + prefix+"strategy", + RecordCountStrategy, + fmt.Sprintf( + "Strategy used by the planner to plan jobs. One of %s", + strings.Join(validStrategies, ", "), + ), + ) + f.Int64Var( + &cfg.TargetRecordCount, + prefix+"target-record-count", + 1000, + fmt.Sprintf( + "Target record count used by the planner to plan jobs. Only used when strategy is %s", + RecordCountStrategy, + ), + ) } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -48,6 +69,16 @@ func (cfg *Config) Validate() error { return errors.New("only -1(latest) and -2(earliest) are valid as negative values for lookback_period") } + switch cfg.Strategy { + case RecordCountStrategy: + if cfg.TargetRecordCount <= 0 { + return errors.New("target record count must be a non-zero value") + } + cfg.planner = NewRecordCountPlanner(cfg.TargetRecordCount) + default: + return fmt.Errorf("invalid strategy: %s", cfg.Strategy) + } + return nil } @@ -66,10 +97,9 @@ type BlockScheduler struct { // NewScheduler creates a new scheduler instance func NewScheduler(cfg Config, queue *JobQueue, offsetReader OffsetReader, logger log.Logger, r prometheus.Registerer) *BlockScheduler { - planner := NewTimeRangePlanner(cfg.TargetRecordConsumptionPeriod, offsetReader, func() time.Time { return time.Now().UTC() }, logger) s := &BlockScheduler{ cfg: cfg, - planner: planner, + planner: cfg.planner, offsetReader: offsetReader, logger: logger, metrics: NewMetrics(r), diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go index 2d857d06a2fe9..977dd1d556d1e 100644 --- a/pkg/blockbuilder/scheduler/scheduler_test.go +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -149,3 +149,104 @@ func TestMultipleBuilders(t *testing.T) { t.Error("builder1 got unexpected second job") } } + +func TestConfig_Validate(t *testing.T) { + tests := []struct { + name string + cfg Config + wantErr string + }{ + { + name: "valid config with record count strategy", + cfg: Config{ + Interval: time.Minute, + LookbackPeriod: -1, + Strategy: RecordCountStrategy, + TargetRecordCount: 1000, + }, + }, + { + name: "zero interval", + cfg: Config{ + Interval: 0, + LookbackPeriod: -1, + Strategy: RecordCountStrategy, + TargetRecordCount: 1000, + }, + wantErr: "interval must be a non-zero value", + }, + { + name: "negative interval", + cfg: Config{ + Interval: -time.Minute, + LookbackPeriod: -1, + Strategy: RecordCountStrategy, + TargetRecordCount: 1000, + }, + wantErr: "interval must be a non-zero value", + }, + { + name: "invalid lookback period", + cfg: Config{ + Interval: time.Minute, + LookbackPeriod: -3, + Strategy: RecordCountStrategy, + TargetRecordCount: 1000, + }, + wantErr: "only -1(latest) and -2(earliest) are valid as negative values for lookback_period", + }, + { + name: "invalid strategy", + cfg: Config{ + Interval: time.Minute, + LookbackPeriod: -1, + Strategy: "invalid", + TargetRecordCount: 1000, + }, + wantErr: "invalid strategy: invalid", + }, + { + name: "zero target record count", + cfg: Config{ + Interval: time.Minute, + LookbackPeriod: -1, + Strategy: RecordCountStrategy, + TargetRecordCount: 0, + }, + wantErr: "target record count must be a non-zero value", + }, + { + name: "negative target record count", + cfg: Config{ + Interval: time.Minute, + LookbackPeriod: -1, + Strategy: RecordCountStrategy, + TargetRecordCount: -1000, + }, + wantErr: "target record count must be a non-zero value", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.wantErr != "" { + if err == nil { + t.Errorf("Validate() error = nil, wantErr %v", tt.wantErr) + return + } + if err.Error() != tt.wantErr { + t.Errorf("Validate() error = %v, wantErr %v", err, tt.wantErr) + } + return + } + if err != nil { + t.Errorf("Validate() error = %v, wantErr nil", err) + } + // Check that planner is set for valid configs + if tt.cfg.planner == nil { + t.Error("Validate() did not set planner for valid config") + } + }) + } +} diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 8824c16f510ea..0b8c38aa64d11 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -3,7 +3,6 @@ package scheduler import ( "context" "sort" - "time" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -14,7 +13,6 @@ import ( // OffsetReader is an interface to list offsets for all partitions of a topic from Kafka. type OffsetReader interface { - ListOffsetsAfterMilli(context.Context, int64) (map[int32]kadm.ListedOffset, error) GroupLag(context.Context) (map[int32]kadm.GroupMemberLag, error) } @@ -24,10 +22,13 @@ type Planner interface { } const ( - RecordCountStrategy = "record_count" - TimeRangeStrategy = "time_range" + RecordCountStrategy = "record-count" ) +var validStrategies = []string{ + RecordCountStrategy, +} + // tries to consume upto targetRecordCount records per partition type RecordCountPlanner struct { targetRecordCount int64 @@ -52,101 +53,40 @@ func (p *RecordCountPlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], return nil, err } - var jobs []*JobWithPriority[int] + jobs := make([]*JobWithPriority[int], 0, len(offsets)) for _, partitionOffset := range offsets { // kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset. // no additional validation is needed here startOffset := partitionOffset.Commit.At + 1 - endOffset := min(startOffset+p.targetRecordCount, partitionOffset.End.Offset) - - job := NewJobWithPriority( - types.NewJob(int(partitionOffset.Partition), types.Offsets{ - Min: startOffset, - Max: endOffset, - }), int(partitionOffset.End.Offset-startOffset), - ) - - jobs = append(jobs, job) - } - - // Sort jobs by partition number to ensure consistent ordering - sort.Slice(jobs, func(i, j int) bool { - return jobs[i].Job.Partition < jobs[j].Job.Partition - }) - - return jobs, nil -} - -// Targets consuming records spanning a configured period. -// This is a stateless planner, it is upto the caller to deduplicate or update jobs that are already in queue or progress. -type TimeRangePlanner struct { - offsetReader OffsetReader - - buffer time.Duration - targetPeriod time.Duration - now func() time.Time - - logger log.Logger -} - -func NewTimeRangePlanner(interval time.Duration, offsetReader OffsetReader, now func() time.Time, logger log.Logger) *TimeRangePlanner { - return &TimeRangePlanner{ - targetPeriod: interval, - buffer: interval, - offsetReader: offsetReader, - now: now, - logger: logger, - } -} - -func (p *TimeRangePlanner) Name() string { - return TimeRangeStrategy -} - -func (p *TimeRangePlanner) Plan(ctx context.Context) ([]*JobWithPriority[int], error) { - // truncate to the nearest Interval - consumeUptoTS := p.now().Add(-p.buffer).Truncate(p.targetPeriod) - - // this will return the latest offset in the partition if no records are produced after this ts. - consumeUptoOffsets, err := p.offsetReader.ListOffsetsAfterMilli(ctx, consumeUptoTS.UnixMilli()) - if err != nil { - level.Error(p.logger).Log("msg", "failed to list offsets after timestamp", "err", err) - return nil, err - } - - offsets, err := p.offsetReader.GroupLag(ctx) - if err != nil { - level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) - return nil, err - } - - var jobs []*JobWithPriority[int] - for _, partitionOffset := range offsets { - startOffset := partitionOffset.Commit.At + 1 - // TODO: we could further break down the work into Interval sized chunks if this partition has pending records spanning a long time range - // or have the builder consume in chunks and commit the job status back to scheduler. - endOffset := consumeUptoOffsets[partitionOffset.Partition].Offset + endOffset := partitionOffset.End.Offset + // Skip if there's no lag if startOffset >= endOffset { - level.Info(p.logger).Log("msg", "no pending records to process", "partition", partitionOffset.Partition, - "commitOffset", partitionOffset.Commit.At, - "consumeUptoOffset", consumeUptoOffsets[partitionOffset.Partition].Offset) continue } - job := NewJobWithPriority( - types.NewJob(int(partitionOffset.Partition), types.Offsets{ - Min: startOffset, - Max: endOffset, - }), int(endOffset-startOffset), - ) + // Create jobs of size targetRecordCount until we reach endOffset + for currentStart := startOffset; currentStart < endOffset; { + currentEnd := min(currentStart+p.targetRecordCount, endOffset) + + job := NewJobWithPriority( + types.NewJob(int(partitionOffset.Partition), types.Offsets{ + Min: currentStart, + Max: currentEnd, + }), int(endOffset-currentStart), // priority is remaining records to process + ) + jobs = append(jobs, job) - jobs = append(jobs, job) + currentStart = currentEnd + } } - // Sort jobs by partition number to ensure consistent ordering + // Sort jobs by partition then priority sort.Slice(jobs, func(i, j int) bool { - return jobs[i].Job.Partition < jobs[j].Job.Partition + if jobs[i].Job.Partition != jobs[j].Job.Partition { + return jobs[i].Job.Partition < jobs[j].Job.Partition + } + return jobs[i].Priority > jobs[j].Priority }) return jobs, nil diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index 30cbd1ee8a172..9c7b732fb4e08 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -5,120 +5,141 @@ import ( "testing" "time" - "github.com/go-kit/log" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kadm" "github.com/grafana/loki/v3/pkg/blockbuilder/types" ) -func TestTimeRangePlanner_Plan(t *testing.T) { - interval := 15 * time.Minute +type mockOffsetReader struct { + groupLag map[int32]kadm.GroupMemberLag +} + +func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) { + return m.groupLag, nil +} + +func TestRecordCountPlanner_Plan(t *testing.T) { for _, tc := range []struct { name string - now time.Time + recordCount int64 expectedJobs []*JobWithPriority[int] groupLag map[int32]kadm.GroupMemberLag - consumeUpto map[int32]kadm.ListedOffset }{ { - name: "normal case. schedule first interval", - now: time.Date(0, 0, 0, 0, 42, 0, 0, time.UTC), // 00:42:00 + name: "single partition, single job", + recordCount: 100, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ At: 100, }, + End: kadm.ListedOffset{ + Offset: 150, + }, Partition: 0, }, }, - consumeUpto: map[int32]kadm.ListedOffset{ - 0: { - Offset: 200, - }, - }, expectedJobs: []*JobWithPriority[int]{ NewJobWithPriority( - types.NewJob(0, types.Offsets{Min: 101, Max: 200}), - 99, // 200-101 + types.NewJob(0, types.Offsets{Min: 101, Max: 150}), + 49, // 150-101 ), }, }, { - name: "normal case. schedule second interval", - now: time.Date(0, 0, 0, 0, 46, 0, 0, time.UTC), // 00:46:00 + name: "single partition, multiple jobs", + recordCount: 50, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ - At: 199, + At: 100, }, - Partition: 0, - }, - 1: { - Commit: kadm.Offset{ - At: 11, + End: kadm.ListedOffset{ + Offset: 200, }, - Partition: 1, - }, - }, - consumeUpto: map[int32]kadm.ListedOffset{ - 0: { - Offset: 300, - }, - 1: { - Offset: 123, + Partition: 0, }, }, expectedJobs: []*JobWithPriority[int]{ NewJobWithPriority( - types.NewJob(0, types.Offsets{Min: 200, Max: 300}), - 100, // 300-200 + types.NewJob(0, types.Offsets{Min: 101, Max: 151}), + 99, // priority is total remaining: 200-101 ), NewJobWithPriority( - types.NewJob(1, types.Offsets{Min: 12, Max: 123}), - 111, // 123-12 + types.NewJob(0, types.Offsets{Min: 151, Max: 200}), + 49, // priority is total remaining: 200-151 ), }, }, { - name: "no pending records to consume. schedule second interval once more time", - now: time.Date(0, 0, 0, 0, 48, 0, 0, time.UTC), // 00:48:00 + name: "multiple partitions", + recordCount: 100, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ - At: 299, + At: 100, + }, + End: kadm.ListedOffset{ + Offset: 150, }, Partition: 0, }, 1: { Commit: kadm.Offset{ - At: 11, + At: 200, + }, + End: kadm.ListedOffset{ + Offset: 400, }, Partition: 1, }, }, - consumeUpto: map[int32]kadm.ListedOffset{ - 0: { - Offset: 300, - }, - 1: { - Offset: 123, - }, - }, expectedJobs: []*JobWithPriority[int]{ NewJobWithPriority( - types.NewJob(1, types.Offsets{Min: 12, Max: 123}), - 111, // 123-12 + types.NewJob(1, types.Offsets{Min: 201, Max: 301}), + 199, // priority is total remaining: 400-201 + ), + NewJobWithPriority( + types.NewJob(1, types.Offsets{Min: 301, Max: 400}), + 99, // priority is total remaining: 400-301 ), + NewJobWithPriority( + types.NewJob(0, types.Offsets{Min: 101, Max: 150}), + 49, // priority is total remaining: 150-101 + ), + }, + }, + { + name: "no lag", + recordCount: 100, + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 100, + }, + End: kadm.ListedOffset{ + Offset: 100, + }, + Partition: 0, + }, }, + expectedJobs: nil, }, } { t.Run(tc.name, func(t *testing.T) { - mockOffsetReader := &mockOffsetReader{ - offsetsAfterMilli: tc.consumeUpto, - groupLag: tc.groupLag, + mockReader := &mockOffsetReader{ + groupLag: tc.groupLag, + } + cfg := Config{ + Interval: time.Second, // foced > 0 in validation + Strategy: RecordCountStrategy, + TargetRecordCount: tc.recordCount, } - planner := NewTimeRangePlanner(interval, mockOffsetReader, func() time.Time { return tc.now }, log.NewNopLogger()) + require.NoError(t, cfg.Validate()) + + planner := NewRecordCountPlanner(tc.recordCount) + planner.offsetReader = mockReader jobs, err := planner.Plan(context.Background()) require.NoError(t, err) @@ -128,16 +149,3 @@ func TestTimeRangePlanner_Plan(t *testing.T) { }) } } - -type mockOffsetReader struct { - offsetsAfterMilli map[int32]kadm.ListedOffset - groupLag map[int32]kadm.GroupMemberLag -} - -func (m *mockOffsetReader) ListOffsetsAfterMilli(_ context.Context, _ int64) (map[int32]kadm.ListedOffset, error) { - return m.offsetsAfterMilli, nil -} - -func (m *mockOffsetReader) GroupLag(_ context.Context) (map[int32]kadm.GroupMemberLag, error) { - return m.groupLag, nil -}