diff --git a/pkg/blockbuilder/architecture.md b/pkg/blockbuilder/architecture.md new file mode 100644 index 0000000000000..633d7340ccca3 --- /dev/null +++ b/pkg/blockbuilder/architecture.md @@ -0,0 +1,159 @@ +# Block Builder Architecture + +## Overview + +The Block Builder and Block Scheduler are separate components designed to build storage formats from ingested Kafka data. The Block Scheduler coordinates job distribution to multiple Block Builder instances, implementing a pull-based architecture that decouples read and write operations, allowing for independent scaling and simpler operational management. This document describes the architecture and interaction between components. + +## Package Structure + +The Block Builder system is organized into three main packages: + +### pkg/blockbuilder/types +- Contains shared type definitions and interfaces +- Defines core data structures like `Job` and `Offsets` +- Provides interface definitions for: + - `Worker`: Interface for processing jobs and reporting status + - `Scheduler`: Interface for job scheduling and worker management + - `Transport`: Interface for communication between components + +### pkg/blockbuilder/scheduler +- Implements the job queue and scheduling logic +- Manages job distribution to block builders +- Tracks job progress and ensures exactly-once processing +- Handles job state management and offset tracking + +### pkg/blockbuilder/builder +- Implements the block builder worker functionality +- Processes assigned jobs and builds storage formats +- Manages transport layer communication +- Handles data processing and object storage interactions + +## Component Diagram + +```mermaid +graph TB + subgraph Kafka + KP[Kafka Partitions] + end + + subgraph Block Scheduler + S[Scheduler] + Q[Job Queue] + PC[Partition Controller] + + subgraph Transport Layer + T[gRPC/Transport Interface] + end + end + + subgraph Block Builders + BB1[Block Builder 1] + BB2[Block Builder 2] + BB3[Block Builder N] + end + + subgraph Storage + OS[Object Storage] + end + + KP --> PC + PC --> S + S <--> Q + S <--> T + T <--> BB1 + T <--> BB2 + T <--> BB3 + BB1 --> OS + BB2 --> OS + BB3 --> OS +``` + +## Job Processing Sequence + +```mermaid +sequenceDiagram + participant PC as Partition Controller + participant S as Block Scheduler + participant Q as Queue + participant T as Transport + participant BB as Block Builder + participant OS as Object Storage + + loop Monitor Partitions + PC->>PC: Check for new offsets + PC->>S: Create Job (partition, offset range) + S->>Q: Enqueue Job + end + + BB->>T: Request Job + T->>S: Forward Request + S->>Q: Dequeue Job + Q-->>S: Return Job (or empty) + alt Has Job + S->>T: Send Job + T->>BB: Forward Job + BB->>OS: Process & Write Data + BB->>T: Report Success + T->>S: Forward Status + S->>PC: Commit Offset + else No Job + S->>T: Send No Job Available + T->>BB: Forward Response + end +``` + +## Core Components + +### Job and Offsets +- `Job`: Represents a unit of work for processing Kafka data + - Contains a partition ID and an offset range + - Immutable data structure that can be safely passed between components +- `Offsets`: Defines a half-open range [min,max) of Kafka offsets to process + - Used to track progress and ensure exactly-once processing + +### Block Scheduler +- Central component responsible for: + - Managing the job queue + - Coordinating Block Builder assignments + - Tracking job progress +- Implements a pull-based model where Block Builders request jobs +- Decoupled from specific transport mechanisms through the Transport interface + +### Block Builder +- Processes jobs assigned by the Block Scheduler +- Responsible for: + - Building storage formats from Kafka data + - Writing completed blocks to object storage + - Reporting job status back to scheduler +- Implements the Worker interface for job processing + +### Transport Layer +- Provides communication between Block Builders and Scheduler +- Abstracts transport mechanism (currently in-memory & gRPC) +- Defines message types for: + - Job requests + - Job completion notifications + - Job synchronization + +## Design Principles + +### Decoupled I/O +- Business logic is separated from I/O operations +- Transport interface allows for different communication mechanisms +- Enables easier testing through mock implementations + +### Stateless Design +- Block Builders are stateless workers +- All state is managed by the Scheduler +- Allows for easy scaling and failover + +### Pull-Based Architecture +- Block Builders pull jobs when ready +- Natural load balancing +- Prevents overloading of workers + + +### Interface-Driven Development +- Core components defined by interfaces +- Allows for multiple implementations +- Facilitates testing and modularity diff --git a/pkg/blockbuilder/builder/builder_test.go b/pkg/blockbuilder/builder/builder_test.go new file mode 100644 index 0000000000000..ac9890526f1d7 --- /dev/null +++ b/pkg/blockbuilder/builder/builder_test.go @@ -0,0 +1,16 @@ +package builder + +import ( + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +// TestBuilder implements Worker interface for testing +type TestBuilder struct { + *Worker +} + +func NewTestBuilder(builderID string, transport types.Transport) *TestBuilder { + return &TestBuilder{ + Worker: NewWorker(builderID, transport), + } +} diff --git a/pkg/blockbuilder/controller.go b/pkg/blockbuilder/builder/controller.go similarity index 92% rename from pkg/blockbuilder/controller.go rename to pkg/blockbuilder/builder/controller.go index 1193119525bd2..60570daa912cb 100644 --- a/pkg/blockbuilder/controller.go +++ b/pkg/blockbuilder/builder/controller.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "context" @@ -7,26 +7,16 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/prometheus/prometheus/model/labels" - "github.com/grafana/dskit/backoff" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/v3/pkg/blockbuilder/types" "github.com/grafana/loki/v3/pkg/kafka" "github.com/grafana/loki/v3/pkg/kafka/partition" "github.com/grafana/loki/pkg/push" ) -// [min,max) -type Offsets struct { - Min, Max int64 -} - -type Job struct { - Partition int32 - Offsets Offsets -} - // Interface required for interacting with queue partitions. type PartitionController interface { Topic() string @@ -43,7 +33,7 @@ type PartitionController interface { // so it's advised to not buffer the channel for natural backpressure. // As a convenience, it returns the last seen offset, which matches // the final record sent on the channel. - Process(context.Context, Offsets, chan<- []AppendInput) (int64, error) + Process(context.Context, types.Offsets, chan<- []AppendInput) (int64, error) Close() error } @@ -125,7 +115,7 @@ func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (i ) } -func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) { +func (l *PartitionJobController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) { l.part.SetOffsetForConsumption(offsets.Min) var ( @@ -188,16 +178,16 @@ func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, c // LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition // Returns whether an applicable job exists, the job, and an error -func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) { +func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, *types.Job, error) { // Read the most recent committed offset committedOffset, err := l.HighestCommittedOffset(ctx) if err != nil { - return false, Job{}, err + return false, nil, err } earliestOffset, err := l.EarliestPartitionOffset(ctx) if err != nil { - return false, Job{}, err + return false, nil, err } startOffset := committedOffset + 1 @@ -207,28 +197,27 @@ func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) highestOffset, err := l.HighestPartitionOffset(ctx) if err != nil { - return false, Job{}, err + return false, nil, err } if highestOffset < committedOffset { level.Error(l.logger).Log("msg", "partition highest offset is less than committed offset", "highest", highestOffset, "committed", committedOffset) - return false, Job{}, fmt.Errorf("partition highest offset is less than committed offset") + return false, nil, fmt.Errorf("partition highest offset is less than committed offset") } if highestOffset == committedOffset { level.Info(l.logger).Log("msg", "no pending records to process") - return false, Job{}, nil + return false, nil, nil } // Create the job with the calculated offsets - job := Job{ - Partition: l.part.Partition(), - Offsets: Offsets{ - Min: startOffset, - Max: min(startOffset+l.stepLen, highestOffset), - }, + offsets := types.Offsets{ + Min: startOffset, + Max: min(startOffset+l.stepLen, highestOffset), } + // Convert partition from int32 to int + job := types.NewJob(int(l.part.Partition()), offsets) return true, job, nil } @@ -279,7 +268,7 @@ func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error return nil } -func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) { +func (d *dummyPartitionController) Process(ctx context.Context, offsets types.Offsets, ch chan<- []AppendInput) (int64, error) { for i := int(offsets.Min); i < int(offsets.Max); i++ { batch := d.createBatch(i) select { diff --git a/pkg/blockbuilder/metrics.go b/pkg/blockbuilder/builder/metrics.go similarity index 99% rename from pkg/blockbuilder/metrics.go rename to pkg/blockbuilder/builder/metrics.go index 31679e34f4466..3411985209533 100644 --- a/pkg/blockbuilder/metrics.go +++ b/pkg/blockbuilder/builder/metrics.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "github.com/prometheus/client_golang/prometheus" diff --git a/pkg/blockbuilder/pipeline.go b/pkg/blockbuilder/builder/pipeline.go similarity index 99% rename from pkg/blockbuilder/pipeline.go rename to pkg/blockbuilder/builder/pipeline.go index 494763d8c83f3..eadefb5f9bb43 100644 --- a/pkg/blockbuilder/pipeline.go +++ b/pkg/blockbuilder/builder/pipeline.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "context" diff --git a/pkg/blockbuilder/pipeline_test.go b/pkg/blockbuilder/builder/pipeline_test.go similarity index 98% rename from pkg/blockbuilder/pipeline_test.go rename to pkg/blockbuilder/builder/pipeline_test.go index 9ec69d2006ebe..be3246318b20e 100644 --- a/pkg/blockbuilder/pipeline_test.go +++ b/pkg/blockbuilder/builder/pipeline_test.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "context" diff --git a/pkg/blockbuilder/slimgester.go b/pkg/blockbuilder/builder/slimgester.go similarity index 99% rename from pkg/blockbuilder/slimgester.go rename to pkg/blockbuilder/builder/slimgester.go index 249a061aa73df..1807892cbec05 100644 --- a/pkg/blockbuilder/slimgester.go +++ b/pkg/blockbuilder/builder/slimgester.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "bytes" diff --git a/pkg/blockbuilder/storage.go b/pkg/blockbuilder/builder/storage.go similarity index 99% rename from pkg/blockbuilder/storage.go rename to pkg/blockbuilder/builder/storage.go index 2815b9b97ad86..859f541b5daf2 100644 --- a/pkg/blockbuilder/storage.go +++ b/pkg/blockbuilder/builder/storage.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "context" diff --git a/pkg/blockbuilder/storage_test.go b/pkg/blockbuilder/builder/storage_test.go similarity index 97% rename from pkg/blockbuilder/storage_test.go rename to pkg/blockbuilder/builder/storage_test.go index 8fc6b237e132d..7f281da48ec62 100644 --- a/pkg/blockbuilder/storage_test.go +++ b/pkg/blockbuilder/builder/storage_test.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "os" diff --git a/pkg/blockbuilder/builder/transport.go b/pkg/blockbuilder/builder/transport.go new file mode 100644 index 0000000000000..ae498459cb667 --- /dev/null +++ b/pkg/blockbuilder/builder/transport.go @@ -0,0 +1,58 @@ +package builder + +import ( + "context" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +var ( + _ types.Transport = unimplementedTransport{} + _ types.Transport = &MemoryTransport{} +) + +// unimplementedTransport provides default implementations that panic +type unimplementedTransport struct{} + +func (t unimplementedTransport) SendGetJobRequest(_ context.Context, _ *types.GetJobRequest) (*types.GetJobResponse, error) { + panic("unimplemented") +} + +func (t unimplementedTransport) SendCompleteJob(_ context.Context, _ *types.CompleteJobRequest) error { + panic("unimplemented") +} + +func (t unimplementedTransport) SendSyncJob(_ context.Context, _ *types.SyncJobRequest) error { + panic("unimplemented") +} + +// MemoryTransport implements Transport interface for in-memory communication +type MemoryTransport struct { + scheduler types.Scheduler +} + +// NewMemoryTransport creates a new in-memory transport instance +func NewMemoryTransport(scheduler types.Scheduler) *MemoryTransport { + return &MemoryTransport{ + scheduler: scheduler, + } +} + +func (t *MemoryTransport) SendGetJobRequest(ctx context.Context, req *types.GetJobRequest) (*types.GetJobResponse, error) { + job, ok, err := t.scheduler.HandleGetJob(ctx, req.BuilderID) + if err != nil { + return nil, err + } + return &types.GetJobResponse{ + Job: job, + OK: ok, + }, nil +} + +func (t *MemoryTransport) SendCompleteJob(ctx context.Context, req *types.CompleteJobRequest) error { + return t.scheduler.HandleCompleteJob(ctx, req.BuilderID, req.Job) +} + +func (t *MemoryTransport) SendSyncJob(ctx context.Context, req *types.SyncJobRequest) error { + return t.scheduler.HandleSyncJob(ctx, req.BuilderID, req.Job) +} diff --git a/pkg/blockbuilder/tsdb.go b/pkg/blockbuilder/builder/tsdb.go similarity index 99% rename from pkg/blockbuilder/tsdb.go rename to pkg/blockbuilder/builder/tsdb.go index 8af463fcd27da..e90bedb3815ad 100644 --- a/pkg/blockbuilder/tsdb.go +++ b/pkg/blockbuilder/builder/tsdb.go @@ -1,4 +1,4 @@ -package blockbuilder +package builder import ( "bytes" diff --git a/pkg/blockbuilder/builder/worker.go b/pkg/blockbuilder/builder/worker.go new file mode 100644 index 0000000000000..41dc0cb7563ec --- /dev/null +++ b/pkg/blockbuilder/builder/worker.go @@ -0,0 +1,66 @@ +package builder + +import ( + "context" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +var ( + _ types.Worker = unimplementedWorker{} + _ types.Worker = &Worker{} +) + +// unimplementedWorker provides default implementations for the Worker interface. +type unimplementedWorker struct{} + +func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error) { + panic("unimplemented") +} + +func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error { + panic("unimplemented") +} + +func (u unimplementedWorker) SyncJob(_ context.Context, _ *types.Job) error { + panic("unimplemented") +} + +// Worker is the implementation of the Worker interface. +type Worker struct { + unimplementedWorker + transport types.Transport + builderID string +} + +// NewWorker creates a new Worker instance. +func NewWorker(builderID string, transport types.Transport) *Worker { + return &Worker{ + transport: transport, + builderID: builderID, + } +} + +func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) { + resp, err := w.transport.SendGetJobRequest(ctx, &types.GetJobRequest{ + BuilderID: w.builderID, + }) + if err != nil { + return nil, false, err + } + return resp.Job, resp.OK, nil +} + +func (w *Worker) CompleteJob(ctx context.Context, job *types.Job) error { + return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{ + BuilderID: w.builderID, + Job: job, + }) +} + +func (w *Worker) SyncJob(ctx context.Context, job *types.Job) error { + return w.transport.SendSyncJob(ctx, &types.SyncJobRequest{ + BuilderID: w.builderID, + Job: job, + }) +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go new file mode 100644 index 0000000000000..3e9cf087c6792 --- /dev/null +++ b/pkg/blockbuilder/scheduler/queue.go @@ -0,0 +1,106 @@ +package scheduler + +import ( + "fmt" + "sync" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +// jobAssignment tracks a job and its assigned builder +type jobAssignment struct { + job *types.Job + builderID string +} + +// JobQueue manages the queue of pending jobs and tracks their state. +type JobQueue struct { + pending map[string]*types.Job // Jobs waiting to be processed, key is job ID + inProgress map[string]*jobAssignment // job ID -> assignment info + completed map[string]*types.Job // Completed jobs, key is job ID + mu sync.RWMutex +} + +// NewJobQueue creates a new job queue instance +func NewJobQueue() *JobQueue { + return &JobQueue{ + pending: make(map[string]*types.Job), + inProgress: make(map[string]*jobAssignment), + completed: make(map[string]*types.Job), + } +} + +// Enqueue adds a new job to the pending queue +// This is a naive implementation, intended to be refactored +func (q *JobQueue) Enqueue(job *types.Job) error { + q.mu.Lock() + defer q.mu.Unlock() + + if _, exists := q.pending[job.ID]; exists { + return fmt.Errorf("job %s already exists in pending queue", job.ID) + } + if _, exists := q.inProgress[job.ID]; exists { + return fmt.Errorf("job %s already exists in progress", job.ID) + } + if _, exists := q.completed[job.ID]; exists { + return fmt.Errorf("job %s already completed", job.ID) + } + + q.pending[job.ID] = job + return nil +} + +// Dequeue gets the next available job and assigns it to a builder +func (q *JobQueue) Dequeue(builderID string) (*types.Job, bool, error) { + q.mu.Lock() + defer q.mu.Unlock() + + // Simple FIFO for now + for id, job := range q.pending { + delete(q.pending, id) + q.inProgress[id] = &jobAssignment{ + job: job, + builderID: builderID, + } + return job, true, nil + } + + return nil, false, nil +} + +// MarkComplete moves a job from in-progress to completed +func (q *JobQueue) MarkComplete(jobID string, builderID string) error { + q.mu.Lock() + defer q.mu.Unlock() + + assignment, exists := q.inProgress[jobID] + if !exists { + return fmt.Errorf("job %s not found in progress", jobID) + } + + if assignment.builderID != builderID { + return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID) + } + + delete(q.inProgress, jobID) + q.completed[jobID] = assignment.job + return nil +} + +// SyncJob updates the state of an in-progress job +func (q *JobQueue) SyncJob(jobID string, builderID string, job *types.Job) error { + q.mu.Lock() + defer q.mu.Unlock() + + assignment, exists := q.inProgress[jobID] + if !exists { + return fmt.Errorf("job %s not found in progress", jobID) + } + + if assignment.builderID != builderID { + return fmt.Errorf("job %s not assigned to builder %s", jobID, builderID) + } + + assignment.job = job + return nil +} diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go new file mode 100644 index 0000000000000..274713b5b1c36 --- /dev/null +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -0,0 +1,56 @@ +package scheduler + +import ( + "context" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +var ( + _ types.Scheduler = unimplementedScheduler{} + _ types.Scheduler = &QueueScheduler{} +) + +// unimplementedScheduler provides default implementations that panic. +type unimplementedScheduler struct{} + +func (s unimplementedScheduler) HandleGetJob(_ context.Context, _ string) (*types.Job, bool, error) { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleCompleteJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} + +func (s unimplementedScheduler) HandleSyncJob(_ context.Context, _ string, _ *types.Job) error { + panic("unimplemented") +} + +// QueueScheduler implements the Scheduler interface +type QueueScheduler struct { + queue *JobQueue +} + +// NewScheduler creates a new scheduler instance +func NewScheduler(queue *JobQueue) *QueueScheduler { + return &QueueScheduler{ + queue: queue, + } +} + +func (s *QueueScheduler) HandleGetJob(ctx context.Context, builderID string) (*types.Job, bool, error) { + select { + case <-ctx.Done(): + return nil, false, ctx.Err() + default: + return s.queue.Dequeue(builderID) + } +} + +func (s *QueueScheduler) HandleCompleteJob(_ context.Context, builderID string, job *types.Job) error { + return s.queue.MarkComplete(job.ID, builderID) +} + +func (s *QueueScheduler) HandleSyncJob(_ context.Context, builderID string, job *types.Job) error { + return s.queue.SyncJob(job.ID, builderID, job) +} diff --git a/pkg/blockbuilder/scheduler/scheduler_test.go b/pkg/blockbuilder/scheduler/scheduler_test.go new file mode 100644 index 0000000000000..ad6829bc8fe69 --- /dev/null +++ b/pkg/blockbuilder/scheduler/scheduler_test.go @@ -0,0 +1,148 @@ +package scheduler + +import ( + "context" + "testing" + "time" + + "github.com/grafana/loki/v3/pkg/blockbuilder/builder" + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +type testEnv struct { + queue *JobQueue + scheduler *QueueScheduler + transport *builder.MemoryTransport + builder *builder.Worker +} + +func newTestEnv(builderID string) *testEnv { + queue := NewJobQueue() + scheduler := NewScheduler(queue) + transport := builder.NewMemoryTransport(scheduler) + builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler)) + + return &testEnv{ + queue: queue, + scheduler: scheduler, + transport: transport, + builder: builder, + } +} + +func TestScheduleAndProcessJob(t *testing.T) { + env := newTestEnv("test-builder-1") + ctx := context.Background() + + // Create and enqueue a test job + job := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + err := env.queue.Enqueue(job) + if err != nil { + t.Fatalf("failed to enqueue job: %v", err) + } + + // Builder gets job + receivedJob, ok, err := env.builder.GetJob(ctx) + if err != nil { + t.Fatalf("failed to get job: %v", err) + } + if !ok { + t.Fatal("expected to receive job") + } + if receivedJob.ID != job.ID { + t.Errorf("got job ID %s, want %s", receivedJob.ID, job.ID) + } + + // Builder completes job + err = env.builder.CompleteJob(ctx, receivedJob) + if err != nil { + t.Fatalf("failed to complete job: %v", err) + } + + // Try to get another job (should be none available) + _, ok, err = env.builder.GetJob(ctx) + if err != nil { + t.Fatalf("failed to get second job: %v", err) + } + if ok { + t.Error("got unexpected second job") + } +} + +func TestContextCancellation(t *testing.T) { + env := newTestEnv("test-builder-1") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + // Try to get job after context timeout + time.Sleep(20 * time.Millisecond) + _, _, err := env.builder.GetJob(ctx) + if err == nil { + t.Error("expected error from cancelled context") + } +} + +func TestMultipleBuilders(t *testing.T) { + // Create first environment + env1 := newTestEnv("test-builder-1") + // Create second builder using same scheduler + builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler)) + + ctx := context.Background() + + // Create test jobs + job1 := types.NewJob(1, types.Offsets{Min: 100, Max: 200}) + job2 := types.NewJob(2, types.Offsets{Min: 300, Max: 400}) + + // Enqueue jobs + err := env1.queue.Enqueue(job1) + if err != nil { + t.Fatalf("failed to enqueue job1: %v", err) + } + err = env1.queue.Enqueue(job2) + if err != nil { + t.Fatalf("failed to enqueue job2: %v", err) + } + + // Builders get jobs + receivedJob1, ok, err := env1.builder.GetJob(ctx) + if err != nil { + t.Fatalf("builder1 failed to get job: %v", err) + } + if !ok { + t.Fatal("builder1 expected to receive job") + } + + receivedJob2, ok, err := builder2.GetJob(ctx) + if err != nil { + t.Fatalf("builder2 failed to get job: %v", err) + } + if !ok { + t.Fatal("builder2 expected to receive job") + } + + // Verify different jobs were assigned + if receivedJob1.ID == receivedJob2.ID { + t.Error("builders received same job") + } + + // Complete jobs + err = env1.builder.CompleteJob(ctx, receivedJob1) + if err != nil { + t.Fatalf("builder1 failed to complete job: %v", err) + } + + err = builder2.CompleteJob(ctx, receivedJob2) + if err != nil { + t.Fatalf("builder2 failed to complete job: %v", err) + } + + // Try to get more jobs (should be none available) + _, ok, err = env1.builder.GetJob(ctx) + if err != nil { + t.Fatalf("builder1 failed to get second job: %v", err) + } + if ok { + t.Error("builder1 got unexpected second job") + } +} diff --git a/pkg/blockbuilder/types/interfaces.go b/pkg/blockbuilder/types/interfaces.go new file mode 100644 index 0000000000000..74267f912fd7e --- /dev/null +++ b/pkg/blockbuilder/types/interfaces.go @@ -0,0 +1,53 @@ +package types + +import "context" + +// Worker interface defines the methods for processing jobs and reporting status. +type Worker interface { + // GetJob requests a new job from the scheduler + GetJob(ctx context.Context) (*Job, bool, error) + // CompleteJob marks a job as finished + CompleteJob(ctx context.Context, job *Job) error + // SyncJob informs the scheduler about an in-progress job + SyncJob(ctx context.Context, job *Job) error +} + +// Scheduler interface defines the methods for scheduling jobs and managing worker pools. +type Scheduler interface { + // HandleGetJob processes a job request from a block builder + HandleGetJob(ctx context.Context, builderID string) (*Job, bool, error) + // HandleCompleteJob processes a job completion notification + HandleCompleteJob(ctx context.Context, builderID string, job *Job) error + // HandleSyncJob processes a job sync request + HandleSyncJob(ctx context.Context, builderID string, job *Job) error +} + +// Transport defines the interface for communication between block builders and scheduler +type Transport interface { + // SendGetJobRequest sends a request to get a new job + SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) + // SendCompleteJob sends a job completion notification + SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error + // SendSyncJob sends a job sync request + SendSyncJob(ctx context.Context, req *SyncJobRequest) error +} + +// Request/Response message types +type GetJobRequest struct { + BuilderID string +} + +type GetJobResponse struct { + Job *Job + OK bool +} + +type CompleteJobRequest struct { + BuilderID string + Job *Job +} + +type SyncJobRequest struct { + BuilderID string + Job *Job +} diff --git a/pkg/blockbuilder/types/job.go b/pkg/blockbuilder/types/job.go new file mode 100644 index 0000000000000..d6ed42b598906 --- /dev/null +++ b/pkg/blockbuilder/types/job.go @@ -0,0 +1,42 @@ +package types + +import "fmt" + +// Job represents a block building task. +type Job struct { + ID string + Status JobStatus + // Partition and offset information + Partition int + Offsets Offsets +} + +// JobStatus represents the current state of a job +type JobStatus int + +const ( + JobStatusPending JobStatus = iota + JobStatusInProgress + JobStatusComplete +) + +// Offsets represents the range of offsets to process +type Offsets struct { + Min int64 + Max int64 +} + +// NewJob creates a new job with the given partition and offsets +func NewJob(partition int, offsets Offsets) *Job { + return &Job{ + ID: GenerateJobID(partition, offsets), + Status: JobStatusPending, + Partition: partition, + Offsets: offsets, + } +} + +// GenerateJobID creates a deterministic job ID from partition and offsets +func GenerateJobID(partition int, offsets Offsets) string { + return fmt.Sprintf("job-%d-%d-%d", partition, offsets.Min, offsets.Max) +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index ff5c6de1565bd..153387035d6b5 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -31,7 +31,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/v3/pkg/analytics" - "github.com/grafana/loki/v3/pkg/blockbuilder" + blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" "github.com/grafana/loki/v3/pkg/bloombuild" "github.com/grafana/loki/v3/pkg/bloomgateway" "github.com/grafana/loki/v3/pkg/compactor" diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 3c76434ef852c..994576076af3e 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -36,7 +36,7 @@ import ( "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/analytics" - "github.com/grafana/loki/v3/pkg/blockbuilder" + blockbuilder "github.com/grafana/loki/v3/pkg/blockbuilder/builder" "github.com/grafana/loki/v3/pkg/bloombuild/builder" "github.com/grafana/loki/v3/pkg/bloombuild/planner" bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"