Skip to content

Commit

Permalink
feat(blockbuilder): grpc transport (#15218)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Dec 3, 2024
1 parent ad322c0 commit 339ba1a
Show file tree
Hide file tree
Showing 7 changed files with 2,547 additions and 21 deletions.
8 changes: 4 additions & 4 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
type testEnv struct {
queue *JobQueue
scheduler *BlockScheduler
transport *builder.MemoryTransport
transport *types.MemoryTransport
builder *builder.Worker
}

func newTestEnv(builderID string) *testEnv {
queue := NewJobQueue()
scheduler := NewScheduler(Config{}, queue, nil, log.NewNopLogger(), prometheus.NewRegistry())
transport := builder.NewMemoryTransport(scheduler)
builder := builder.NewWorker(builderID, builder.NewMemoryTransport(scheduler))
transport := types.NewMemoryTransport(scheduler)
builder := builder.NewWorker(builderID, transport)

return &testEnv{
queue: queue,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestMultipleBuilders(t *testing.T) {
// Create first environment
env1 := newTestEnv("test-builder-1")
// Create second builder using same scheduler
builder2 := builder.NewWorker("test-builder-2", builder.NewMemoryTransport(env1.scheduler))
builder2 := builder.NewWorker("test-builder-2", env1.transport)

ctx := context.Background()

Expand Down
147 changes: 147 additions & 0 deletions pkg/blockbuilder/types/grpc_transport.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package types

import (
"context"
"flag"
"io"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/instrument"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/v3/pkg/blockbuilder/types/proto"
"github.com/grafana/loki/v3/pkg/util/constants"
)

var _ Transport = &GRPCTransport{}

type GRPCTransportConfig struct {
Address string `yaml:"address,omitempty"`

// GRPCClientConfig configures the gRPC connection between the Bloom Gateway client and the server.
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
}

func (cfg *GRPCTransportConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.StringVar(&cfg.Address, prefix+"address", "", "address in DNS Service Discovery format: https://grafana.com/docs/mimir/latest/configure/about-dns-service-discovery/#supported-discovery-modes")
}

type grpcTransportMetrics struct {
requestLatency *prometheus.HistogramVec
}

func newGRPCTransportMetrics(registerer prometheus.Registerer) *grpcTransportMetrics {
return &grpcTransportMetrics{
requestLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "block_builder_grpc",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent serving requests when using the block builder grpc transport",
Buckets: instrument.DefBuckets,
}, []string{"operation", "status_code"}),
}
}

// GRPCTransport implements the Transport interface using gRPC
type GRPCTransport struct {
grpc_health_v1.HealthClient
io.Closer
proto.BlockBuilderServiceClient
}

// NewGRPCTransportFromAddress creates a new gRPC transport instance from an address and dial options
func NewGRPCTransportFromAddress(
metrics *grpcTransportMetrics,
cfg GRPCTransportConfig,
) (*GRPCTransport, error) {

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(metrics.requestLatency))
if err != nil {
return nil, err
}

// nolint:staticcheck // grpc.Dial() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.Dial(cfg.Address, dialOpts...)
if err != nil {
return nil, errors.Wrap(err, "new grpc pool dial")
}

return &GRPCTransport{
Closer: conn,
HealthClient: grpc_health_v1.NewHealthClient(conn),
BlockBuilderServiceClient: proto.NewBlockBuilderServiceClient(conn),
}, nil
}

// SendGetJobRequest implements Transport
func (t *GRPCTransport) SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error) {
protoReq := &proto.GetJobRequest{
BuilderId: req.BuilderID,
}

resp, err := t.GetJob(ctx, protoReq)
if err != nil {
return nil, err
}

return &GetJobResponse{
Job: protoToJob(resp.GetJob()),
OK: resp.GetOk(),
}, nil
}

// SendCompleteJob implements Transport
func (t *GRPCTransport) SendCompleteJob(ctx context.Context, req *CompleteJobRequest) error {
protoReq := &proto.CompleteJobRequest{
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
}

_, err := t.CompleteJob(ctx, protoReq)
return err
}

// SendSyncJob implements Transport
func (t *GRPCTransport) SendSyncJob(ctx context.Context, req *SyncJobRequest) error {
protoReq := &proto.SyncJobRequest{
BuilderId: req.BuilderID,
Job: jobToProto(req.Job),
}

_, err := t.SyncJob(ctx, protoReq)
return err
}

// protoToJob converts a proto Job to a types.Job
func protoToJob(p *proto.Job) *Job {
if p == nil {
return nil
}
return &Job{
ID: p.GetId(),
Partition: int(p.GetPartition()),
Offsets: Offsets{
Min: p.GetOffsets().GetMin(),
Max: p.GetOffsets().GetMax(),
},
}
}

// jobToProto converts a types.Job to a proto Job
func jobToProto(j *Job) *proto.Job {
if j == nil {
return nil
}
return &proto.Job{
Id: j.ID,
Partition: int32(j.Partition),
Offsets: &proto.Offsets{
Min: j.Offsets.Min,
Max: j.Offsets.Max,
},
}
}
9 changes: 9 additions & 0 deletions pkg/blockbuilder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ type Scheduler interface {

// Transport defines the interface for communication between block builders and scheduler
type Transport interface {
BuilderTransport
SchedulerTransport
}

// SchedulerTransport is for calls originating from the scheduler
type SchedulerTransport interface{}

// BuilderTransport is for calls originating from the builder
type BuilderTransport interface {
// SendGetJobRequest sends a request to get a new job
SendGetJobRequest(ctx context.Context, req *GetJobRequest) (*GetJobResponse, error)
// SendCompleteJob sends a job completion notification
Expand Down
4 changes: 1 addition & 3 deletions pkg/blockbuilder/types/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import "fmt"

// Job represents a block building task.
type Job struct {
ID string
Status JobStatus
ID string
// Partition and offset information
Partition int
Offsets Offsets
Expand All @@ -30,7 +29,6 @@ type Offsets struct {
func NewJob(partition int, offsets Offsets) *Job {
return &Job{
ID: GenerateJobID(partition, offsets),
Status: JobStatusPending,
Partition: partition,
Offsets: offsets,
}
Expand Down
Loading

0 comments on commit 339ba1a

Please sign in to comment.