Skip to content

Commit

Permalink
[remote-storage] Use OTEL helper instead of tlscfg (#6351)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #4316

## Description of the changes
- 

## How was this change tested?
- 

## Checklist
- [ ] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [ ] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [ ] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: chahatsagarmain <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent 0833839 commit 737be78
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 118 deletions.
13 changes: 5 additions & 8 deletions cmd/remote-storage/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"fmt"

"github.com/spf13/viper"
"go.uber.org/zap"
"go.opentelemetry.io/collector/config/configgrpc"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
Expand All @@ -25,10 +25,7 @@ var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{

// Options holds configuration for remote-storage service.
type Options struct {
// GRPCHostPort is the host:port address for gRPC server
GRPCHostPort string
// TLSGRPC configures secure transport
TLSGRPC tlscfg.Options
configgrpc.ServerConfig
// Tenancy configuration
Tenancy tenancy.Options
}
Expand All @@ -41,13 +38,13 @@ func AddFlags(flagSet *flag.FlagSet) {
}

// InitFromViper initializes Options with properties from CLI flags.
func (o *Options) InitFromViper(v *viper.Viper, _ *zap.Logger) (*Options, error) {
o.GRPCHostPort = v.GetString(flagGRPCHostPort)
func (o *Options) InitFromViper(v *viper.Viper) (*Options, error) {
o.NetAddr.Endpoint = v.GetString(flagGRPCHostPort)
tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v)
if err != nil {
return o, fmt.Errorf("failed to process gRPC TLS options: %w", err)
}
o.TLSGRPC = tlsGrpc
o.TLSSetting = tlsGrpc.ToOtelServerConfig()
o.Tenancy = tenancy.InitFromViper(v)
return o, nil
}
7 changes: 3 additions & 4 deletions cmd/remote-storage/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
)
Expand All @@ -18,9 +17,9 @@ func TestFlags(t *testing.T) {
command.ParseFlags([]string{
"--grpc.host-port=127.0.0.1:8081",
})
qOpts, err := new(Options).InitFromViper(v, zap.NewNop())
qOpts, err := new(Options).InitFromViper(v)
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:8081", qOpts.GRPCHostPort)
assert.Equal(t, "127.0.0.1:8081", qOpts.NetAddr.Endpoint)
}

func TestFailedTLSFlags(t *testing.T) {
Expand All @@ -30,6 +29,6 @@ func TestFailedTLSFlags(t *testing.T) {
"--grpc.tls.cert=blah", // invalid unless tls.enabled
})
require.NoError(t, err)
_, err = new(Options).InitFromViper(v, zap.NewNop())
_, err = new(Options).InitFromViper(v)
assert.ErrorContains(t, err, "failed to process gRPC TLS options")
}
54 changes: 23 additions & 31 deletions cmd/remote-storage/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@
package app

import (
"context"
"fmt"
"net"
"sync"

"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"

Expand All @@ -27,11 +29,10 @@ import (

// Server runs a gRPC server
type Server struct {
opts *Options

opts *Options
grpcConn net.Listener
grpcServer *grpc.Server
wg sync.WaitGroup
stopped sync.WaitGroup
telset telemetry.Settings
}

Expand All @@ -42,7 +43,7 @@ func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy
return nil, err
}

grpcServer, err := createGRPCServer(options, tm, handler, telset.Logger)
grpcServer, err := createGRPCServer(options, tm, handler, telset)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -86,18 +87,7 @@ func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCH
return handler, nil
}

func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, logger *zap.Logger) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if opts.TLSGRPC.Enabled {
tlsCfg, err := opts.TLSGRPC.Config(logger)
if err != nil {
return nil, fmt.Errorf("invalid TLS config: %w", err)
}
creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))
}

func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHandler, telset telemetry.Settings) (*grpc.Server, error) {
unaryInterceptors := []grpc.UnaryServerInterceptor{
bearertoken.NewUnaryServerInterceptor(),
}
Expand All @@ -109,12 +99,16 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa
streamInterceptors = append(streamInterceptors, tenancy.NewGuardingStreamInterceptor(tm))
}

grpcOpts = append(grpcOpts,
grpc.ChainUnaryInterceptor(unaryInterceptors...),
grpc.ChainStreamInterceptor(streamInterceptors...),
opts.NetAddr.Transport = confignet.TransportTypeTCP
server, err := opts.ToServer(context.Background(),
telset.Host,
telset.ToOtelComponent(),
configgrpc.WithGrpcServerOption(grpc.ChainUnaryInterceptor(unaryInterceptors...)),
configgrpc.WithGrpcServerOption(grpc.ChainStreamInterceptor(streamInterceptors...)),
)

server := grpc.NewServer(grpcOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC server: %w", err)
}
healthServer := health.NewServer()
reflection.Register(server)
handler.Register(server, healthServer)
Expand All @@ -124,15 +118,15 @@ func createGRPCServer(opts *Options, tm *tenancy.Manager, handler *shared.GRPCHa

// Start gRPC server concurrently
func (s *Server) Start() error {
listener, err := net.Listen("tcp", s.opts.GRPCHostPort)
var err error
s.grpcConn, err = s.opts.NetAddr.Listen(context.Background())
if err != nil {
return err
return fmt.Errorf("failed to listen on gRPC port: %w", err)
}
s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr()))
s.grpcConn = listener
s.wg.Add(1)
s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", s.grpcConn.Addr()))
s.stopped.Add(1)
go func() {
defer s.wg.Done()
defer s.stopped.Done()
if err := s.grpcServer.Serve(s.grpcConn); err != nil {
s.telset.Logger.Error("GRPC server exited", zap.Error(err))
s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err))
Expand All @@ -145,9 +139,7 @@ func (s *Server) Start() error {
// Close stops http, GRPC servers and closes the port listener.
func (s *Server) Close() error {
s.grpcServer.Stop()
s.grpcConn.Close()
s.opts.TLSGRPC.Close()
s.wg.Wait()
s.stopped.Wait()
s.telset.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped))
return nil
}
Loading

0 comments on commit 737be78

Please sign in to comment.