Skip to content

Commit

Permalink
refactor(blockbuilder): transport job completion reports success (#15313
Browse files Browse the repository at this point in the history
)
  • Loading branch information
owen-d authored Dec 9, 2024
1 parent c519ab6 commit 0ddaca8
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 65 deletions.
16 changes: 10 additions & 6 deletions pkg/blockbuilder/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,17 +260,21 @@ func (i *BlockBuilder) runOne(ctx context.Context, workerID string) (bool, error
i.metrics.inflightJobs.Set(float64(len(i.inflightJobs)))
i.jobsMtx.Unlock()

lastConsumedOffset, err := i.processJob(ctx, job, logger)
completion := &types.CompleteJobRequest{
BuilderID: workerID,
Job: job,
Success: true,
}
if _, err = i.processJob(ctx, job, logger); err != nil {
level.Error(i.logger).Log("msg", "failed to process job", "err", err)
completion.Success = false
}

if _, err := withBackoff(
ctx,
i.cfg.Backoff,
func() (res struct{}, err error) {
if err = i.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: workerID,
Job: job,
LastConsumedOffset: lastConsumedOffset,
}); err != nil {
if err = i.SendCompleteJob(ctx, completion); err != nil {
level.Error(i.logger).Log("msg", "failed to mark the job as complete", "err", err)
}
return
Expand Down
5 changes: 3 additions & 2 deletions pkg/blockbuilder/builder/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (u unimplementedWorker) GetJob(_ context.Context) (*types.Job, bool, error)
panic("unimplemented")
}

func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job) error {
func (u unimplementedWorker) CompleteJob(_ context.Context, _ *types.Job, _ bool) error {
panic("unimplemented")
}

Expand Down Expand Up @@ -51,10 +51,11 @@ func (w *Worker) GetJob(ctx context.Context) (*types.Job, bool, error) {
return resp.Job, resp.OK, nil
}

func (w *Worker) CompleteJob(ctx context.Context, job *types.Job) error {
func (w *Worker) CompleteJob(ctx context.Context, job *types.Job, success bool) error {
return w.transport.SendCompleteJob(ctx, &types.CompleteJobRequest{
BuilderID: w.builderID,
Job: job,
Success: success,
})
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestScheduleAndProcessJob(t *testing.T) {
}

// Builder completes job
err = env.builder.CompleteJob(ctx, receivedJob)
err = env.builder.CompleteJob(ctx, receivedJob, true)
if err != nil {
t.Fatalf("failed to complete job: %v", err)
}
Expand Down Expand Up @@ -130,12 +130,12 @@ func TestMultipleBuilders(t *testing.T) {
}

// Complete jobs
err = env1.builder.CompleteJob(ctx, receivedJob1)
err = env1.builder.CompleteJob(ctx, receivedJob1, true)
if err != nil {
t.Fatalf("builder1 failed to complete job: %v", err)
}

err = builder2.CompleteJob(ctx, receivedJob2)
err = builder2.CompleteJob(ctx, receivedJob2, true)
if err != nil {
t.Fatalf("builder2 failed to complete job: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobReques
// SendCompleteJob implements Transport
func (t *GRPCTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
protoReq := &proto.CompleteJobRequest{
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
LastConsumedOffset: req.LastConsumedOffset,
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
Success: req.Success,
}

_, err := t.CompleteJob(ctx, protoReq)
Expand Down
8 changes: 4 additions & 4 deletions pkg/blockbuilder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ type Worker interface {
// GetJob requests a new job from the scheduler
GetJob(ctx context.Context) (*Job, bool, error)
// CompleteJob marks a job as finished
CompleteJob(ctx context.Context, job *Job) error
CompleteJob(ctx context.Context, job *Job, success bool) error
// SyncJob informs the scheduler about an in-progress job
SyncJob(ctx context.Context, job *Job) error
}
Expand Down Expand Up @@ -52,9 +52,9 @@ type GetJobResponse struct {
}

type CompleteJobRequest struct {
BuilderID string
Job *Job
LastConsumedOffset int64
BuilderID string
Job *Job
Success bool
}

type SyncJobRequest struct {
Expand Down
98 changes: 52 additions & 46 deletions pkg/blockbuilder/types/proto/blockbuilder.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/blockbuilder/types/proto/blockbuilder.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ message GetJobResponse {
message CompleteJobRequest {
string builder_id = 1;
Job job = 2;
int64 LastConsumedOffset = 3;
bool success = 3;
}

// CompleteJobResponse is an empty response for job completion
Expand Down

0 comments on commit 0ddaca8

Please sign in to comment.