Skip to content

Commit

Permalink
use otel helper
Browse files Browse the repository at this point in the history
Signed-off-by: chahatsagarmain <[email protected]>
  • Loading branch information
chahatsagarmain committed Dec 12, 2024
1 parent d69dad5 commit ae7bf84
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 110 deletions.
19 changes: 11 additions & 8 deletions cmd/remote-storage/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -25,10 +26,7 @@ var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{

// Options holds configuration for remote-storage service.
type Options struct {
// GRPCHostPort is the host:port address for gRPC server
GRPCHostPort string
// TLSGRPC configures secure transport
TLSGRPC tlscfg.Options
GRPCCFG *configgrpc.ServerConfig
// Tenancy configuration
Tenancy tenancy.Options
}
Expand All @@ -41,13 +39,18 @@ func AddFlags(flagSet *flag.FlagSet) {
}

// InitFromViper initializes Options with properties from CLI flags.
func (o *Options) InitFromViper(v *viper.Viper, _ *zap.Logger) (*Options, error) {
o.GRPCHostPort = v.GetString(flagGRPCHostPort)
func (o *Options) InitFromViper(v *viper.Viper) (*Options, error) {
grpcEndpoint := v.GetString(flagGRPCHostPort)
o.GRPCCFG = &configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: grpcEndpoint,
},
}
tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v)
if err != nil {
return o, fmt.Errorf("failed to process gRPC TLS options: %w", err)
}
o.TLSGRPC = tlsGrpc
o.GRPCCFG.TLSSetting = tlsGrpc.ToOtelServerConfig()
o.Tenancy = tenancy.InitFromViper(v)
return o, nil
}
7 changes: 3 additions & 4 deletions cmd/remote-storage/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
)
Expand All @@ -18,9 +17,9 @@ func TestFlags(t *testing.T) {
command.ParseFlags([]string{
"--grpc.host-port=127.0.0.1:8081",
})
qOpts, err := new(Options).InitFromViper(v, zap.NewNop())
qOpts, err := new(Options).InitFromViper(v)
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:8081", qOpts.GRPCHostPort)
assert.Equal(t, "127.0.0.1:8081", qOpts.GRPCCFG.NetAddr.Endpoint)
}

func TestFailedTLSFlags(t *testing.T) {
Expand All @@ -30,6 +29,6 @@ func TestFailedTLSFlags(t *testing.T) {
"--grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = new(Options).InitFromViper(v, zap.NewNop())
_, err = new(Options).InitFromViper(v)
assert.ErrorContains(t, err, "failed to process gRPC TLS options")
}
45 changes: 20 additions & 25 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
package app

import (
"fmt"
"context"
"net"
"sync"

"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

Expand All @@ -27,8 +28,7 @@ import (

// Server runs a gRPC server
type Server struct {
opts *Options

opts *Options
grpcConn net.Listener
grpcServer *grpc.Server
wg sync.WaitGroup
Expand All @@ -42,7 +42,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
return nil, err
}

grpcServer, err := createGRPCServer(options, tm, handler, telset.Logger)
grpcServer, err := createGRPCServer(options, tm, handler, telset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,17 +86,8 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH
return handler, nil
}

func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if opts.TLSGRPC.Enabled {
tlsCfg, err := opts.TLSGRPC.Config(logger)
if err != nil {
return nil, fmt.Errorf("invalid TLS config: %w", err)
}
creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, telset telemetry.Settings) (*grpc.Server, error) {
var grpcOpts []configgrpc.ToServerOption

unaryInterceptors := []grpc.UnaryServerInterceptor{
bearertoken.NewUnaryServerInterceptor(),
Expand All @@ -110,11 +101,17 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa
}

grpcOpts = append(grpcOpts,
grpc.ChainUnaryInterceptor(unaryInterceptors...),
grpc.ChainStreamInterceptor(streamInterceptors...),
configgrpc.WithGrpcServerOption(grpc.ChainUnaryInterceptor(unaryInterceptors...)),
configgrpc.WithGrpcServerOption(grpc.ChainStreamInterceptor(streamInterceptors...)),
)

server := grpc.NewServer(grpcOpts...)
opts.GRPCCFG.NetAddr.Transport = confignet.TransportTypeTCP
server, err := opts.GRPCCFG.ToServer(context.Background(),
nil,
telset.ToOtelComponent(),
grpcOpts...)
if err != nil {
return nil, err
}
healthServer := health.NewServer()
reflection.Register(server)
handler.Register(server, healthServer)
Expand All @@ -124,12 +121,12 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa

// Start gRPC server concurrently
func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.opts.GRPCHostPort)
var err error
s.grpcConn, err = s.opts.GRPCCFG.NetAddr.Listen(context.Background())
if err != nil {
return err
}
s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", s.grpcConn.Addr()))
s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand All @@ -145,8 +142,6 @@ func (s *Server) Start() error {
// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() error {
s.grpcServer.Stop()
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
s.wg.Wait()
s.telset.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped))
return nil
Expand Down
Loading

0 comments on commit ae7bf84

Please sign in to comment.