Skip to content

Commit

Permalink
[query] Use OTEL's helpers for grpc server (jaegertracing#6055)
Browse files Browse the repository at this point in the history
<!--
!! Please DELETE this comment before posting.
We appreciate your contribution to the Jaeger project! πŸ‘‹πŸŽ‰
-->

## Which problem is this PR solving?
- Towards jaegertracing#6026 

## Description of the changes
- Migrates the GRPC query server to create the server using OTEL rather
than using a custom implementation
- Adds a log to warn users that having the same port for GRPC and HTTP
is now deprecated. We'll be removing this functionality after Feb 2025.

## How was this change tested?
- Unit tests / CI

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Mahad Zaryab <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
mahadzaryab1 and yurishkuro authored Oct 24, 2024
1 parent f9474f9 commit c96790a
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 71 deletions.
7 changes: 4 additions & 3 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to initialize collector", zap.Error(err))
}
qOpts, err := new(queryApp.QueryOptions).InitFromViper(v, logger)
defaultOpts := queryApp.DefaultQueryOptions()
qOpts, err := defaultOpts.InitFromViper(v, logger)
if err != nil {
logger.Fatal("Failed to configure query service", zap.Error(err))
}
Expand Down Expand Up @@ -220,11 +221,11 @@ func startQuery(
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)

server, err := queryApp.NewServer(qs, metricsQueryService, qOpts, tm, telset)
server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
if err := server.Start(); err != nil {
if err := server.Start(context.Background()); err != nil {
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}

Expand Down
16 changes: 1 addition & 15 deletions cmd/jaeger/internal/extension/jaegerquery/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@ import (
"context"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/extension"

"github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/ports"
)

// componentType is the name of this extension in configuration.
Expand All @@ -28,17 +24,7 @@ func NewFactory() extension.Factory {

func createDefaultConfig() component.Config {
return &Config{
QueryOptions: app.QueryOptions{
HTTP: confighttp.ServerConfig{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
Transport: confignet.TransportTypeTCP,
},
},
},
QueryOptions: app.DefaultQueryOptions(),
}
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (*server) Dependencies() []component.ID {
return []component.ID{jaegerstorage.ID}
}

func (s *server) Start(_ context.Context, host component.Host) error {
func (s *server) Start(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"})
queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"})
Expand Down Expand Up @@ -100,11 +100,11 @@ func (s *server) Start(_ context.Context, host component.Host) error {
ReportStatus: func(event *componentstatus.Event) {
componentstatus.ReportStatus(host, event)
},
Host: host,
}

// TODO contextcheck linter complains about next line that context is not passed. It is not wrong.
//nolint
s.server, err = queryApp.NewServer(
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
qs,
mqs,
Expand All @@ -116,7 +116,7 @@ func (s *server) Start(_ context.Context, host component.Host) error {
return fmt.Errorf("could not create jaeger-query: %w", err)
}

if err := s.server.Start(); err != nil {
if err := s.server.Start(ctx); err != nil {
return fmt.Errorf("could not start jaeger-query: %w", err)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -211,6 +212,7 @@ func TestServerStart(t *testing.T) {
}
tt.config.HTTP.Endpoint = ":0"
tt.config.GRPC.NetAddr.Endpoint = ":0"
tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)
if tt.expectedErr == "" {
Expand Down
20 changes: 20 additions & 0 deletions cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/spf13/viper"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.uber.org/zap"

Expand Down Expand Up @@ -100,6 +101,11 @@ func AddFlags(flagSet *flag.FlagSet) {
func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*QueryOptions, error) {
qOpts.HTTP.Endpoint = v.GetString(queryHTTPHostPort)
qOpts.GRPC.NetAddr.Endpoint = v.GetString(queryGRPCHostPort)
// TODO: drop support for same host ports
// https://github.com/jaegertracing/jaeger/issues/6117
if qOpts.HTTP.Endpoint == qOpts.GRPC.NetAddr.Endpoint {
logger.Warn("using the same port for gRPC and HTTP is deprecated; please use dedicated ports instead; support for shared ports will be removed in Feb 2025")
}
tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v)
if err != nil {
return qOpts, fmt.Errorf("failed to process gRPC TLS options: %w", err)
Expand Down Expand Up @@ -169,3 +175,17 @@ func mapHTTPHeaderToOTELHeaders(h http.Header) map[string]configopaque.String {

return otelHeaders
}

func DefaultQueryOptions() QueryOptions {
return QueryOptions{
HTTP: confighttp.ServerConfig{
Endpoint: ports.PortToHostPort(ports.QueryHTTP),
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ports.PortToHostPort(ports.QueryGRPC),
Transport: confignet.TransportTypeTCP,
},
},
}
}
25 changes: 25 additions & 0 deletions cmd/query/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/mocks"
spanstore_mocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks"
Expand Down Expand Up @@ -183,3 +184,27 @@ func TestQueryOptions_FailedTLSFlags(t *testing.T) {
})
}
}

func TestQueryOptions_SamePortsLogsWarning(t *testing.T) {
logger, logBuf := testutils.NewLogger()
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{
"--query.http-server.host-port=127.0.0.1:8081",
"--query.grpc-server.host-port=127.0.0.1:8081",
})
_, err := new(QueryOptions).InitFromViper(v, logger)
require.NoError(t, err)

require.Contains(
t,
logBuf.String(),
"using the same port for gRPC and HTTP is deprecated",
)
}

func TestDefaultQueryOptions(t *testing.T) {
qo := DefaultQueryOptions()
require.Equal(t, ":16686", qo.HTTP.Endpoint)
require.Equal(t, ":16685", qo.GRPC.NetAddr.Endpoint)
require.EqualValues(t, "tcp", qo.GRPC.NetAddr.Transport)
}
77 changes: 65 additions & 12 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ import (

"github.com/gorilla/handlers"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -54,7 +59,9 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(querySvc *querysvc.QueryService,
func NewServer(
ctx context.Context,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
options *QueryOptions,
tm *tenancy.Manager,
Expand All @@ -74,12 +81,18 @@ func NewServer(querySvc *querysvc.QueryService,
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, telset)
var grpcServer *grpc.Server
if separatePorts {
grpcServer, err = createGRPCServerLegacy(ctx, options, tm)
} else {
grpcServer, err = createGRPCServerOTEL(ctx, options, tm, telset)
}
if err != nil {
return nil, err
}
registerGRPCHandlers(grpcServer, querySvc, metricsQuerySvc, telset)

httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, telset)
httpServer, err := createHTTPServer(ctx, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}
Expand All @@ -94,11 +107,15 @@ func NewServer(querySvc *querysvc.QueryService,
}, nil
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) {
func createGRPCServerLegacy(
ctx context.Context,
options *QueryOptions,
tm *tenancy.Manager,
) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.GRPC.TLSSetting != nil {
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background())
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(ctx)
if err != nil {
return nil, err
}
Expand All @@ -108,15 +125,24 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
if tm.Enabled {
//nolint:contextcheck
grpcOpts = append(grpcOpts,
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

server := grpc.NewServer(grpcOpts...)
reflection.Register(server)
return server, nil
}

func registerGRPCHandlers(
server *grpc.Server,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
telset telemetery.Setting,
) {
reflection.Register(server)
handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{
Logger: telset.Logger,
})
Expand All @@ -131,7 +157,33 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING)

grpc_health_v1.RegisterHealthServer(server, healthServer)
return server, nil
}

func createGRPCServerOTEL(
ctx context.Context,
options *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
) (*grpc.Server, error) {
var grpcOpts []configgrpc.ToServerOption
if tm.Enabled {
//nolint:contextcheck
grpcOpts = append(grpcOpts,
configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))),
configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))),
)
}
return options.GRPC.ToServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
},
grpcOpts...)
}

type httpServer struct {
Expand All @@ -142,6 +194,7 @@ type httpServer struct {
var _ io.Closer = (*httpServer)(nil)

func createHTTPServer(
ctx context.Context,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
Expand Down Expand Up @@ -189,7 +242,7 @@ func createHTTPServer(
}

if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided
if err != nil {
return nil, err
}
Expand All @@ -209,10 +262,10 @@ func (hS httpServer) Close() error {
}

// initListener initialises listeners of the server
func (s *Server) initListener() (cmux.CMux, error) {
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPC.NetAddr.Endpoint)
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -260,8 +313,8 @@ func (s *Server) initListener() (cmux.CMux, error) {
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start() error {
cmuxServer, err := s.initListener()
func (s *Server) Start(ctx context.Context) error {
cmuxServer, err := s.initListener(ctx)
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
}
Expand Down
Loading

0 comments on commit c96790a

Please sign in to comment.