From cedaeaaf6a80161ee32a5478793a0d0ae1e2a337 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Fri, 29 Nov 2024 13:03:43 -0400 Subject: [PATCH] Improve telemetry.Settings (#6275) ## Which problem is this PR solving? - Part of #5633 ## Description of the changes - Rename `telemetry.Setting` to `telemetry.Settings` - Create helpers `NoopSettings()` and `FromOtelComponent()` - Remove `LeveledMeterProvider` which is deprecated in OTEL - Use `telset` in more places - Pull out `Initialize()` from `storage.Factory` ## How was this change tested? - CI ## Checklist - [ ] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [ ] I have signed all commits - [ ] I have added unit tests for the new functionality - [ ] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Yuri Shkuro --- cmd/all-in-one/main.go | 29 ++-- cmd/collector/main.go | 7 +- cmd/ingester/main.go | 7 +- .../internal/extension/jaegerquery/server.go | 49 +++---- .../extension/jaegerquery/server_test.go | 28 ++-- .../extension/jaegerstorage/extension.go | 25 +--- cmd/query/app/flags.go | 2 +- cmd/query/app/querysvc/query_service.go | 2 +- cmd/query/app/server.go | 62 ++++----- cmd/query/app/server_test.go | 20 +-- cmd/query/app/token_propagation_test.go | 19 +-- cmd/query/main.go | 31 +++-- cmd/remote-storage/app/server.go | 16 +-- cmd/remote-storage/app/server_test.go | 19 +-- cmd/remote-storage/main.go | 20 +-- crossdock/main.go | 4 +- pkg/telemetry/settings.go | 43 ++++-- pkg/telemetry/settings_test.go | 33 +++++ plugin/storage/grpc/factory.go | 25 ++-- plugin/storage/grpc/factory_test.go | 22 +--- .../integration/remote_memory_storage.go | 13 +- storage/factory.go | 23 ++-- storage/mocks/BaseFactory.go | 124 ++++++++++++++++++ 23 files changed, 382 insertions(+), 241 deletions(-) create mode 100644 storage/mocks/BaseFactory.go diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 9b3c3c7c4b6..2bdae2beaf8 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -14,9 +14,7 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" + noopmetric "go.opentelemetry.io/otel/metric/noop" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -95,8 +93,16 @@ by default uses only in-memory database.`, logger.Fatal("Failed to initialize tracer", zap.Error(err)) } + baseTelset := telemetry.Settings{ + Logger: svc.Logger, + TracerProvider: tracer.OTEL, + Metrics: baseFactory, + MeterProvider: noopmetric.NewMeterProvider(), + ReportStatus: telemetry.HCAdapter(svc.HC()), + } + storageFactory.InitFromViper(v, logger) - if err := storageFactory.Initialize(baseFactory, logger); err != nil { + if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } @@ -159,20 +165,13 @@ by default uses only in-memory database.`, log.Fatal(err) } - telset := telemetry.Setting{ - Logger: svc.Logger, - TracerProvider: tracer.OTEL, - Metrics: queryMetricsFactory, - ReportStatus: telemetry.HCAdapter(svc.HC()), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } // query + queryTelset := baseTelset // copy + queryTelset.Metrics = queryMetricsFactory querySrv := startQuery( svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), spanReader, dependencyReader, metricsQueryService, - tm, telset, + tm, queryTelset, ) svc.RunAndThen(func() { @@ -222,7 +221,7 @@ func startQuery( depReader dependencystore.Reader, metricsQueryService querysvc.MetricsQueryService, tm *tenancy.Manager, - telset telemetry.Setting, + telset telemetry.Settings, ) *queryApp.Server { spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) diff --git a/cmd/collector/main.go b/cmd/collector/main.go index db658c209b4..86f26d1cb84 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -24,6 +24,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/internal/status" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider" @@ -63,8 +64,12 @@ func main() { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector"}) version.NewInfoMetrics(metricsFactory) + baseTelset := telemetry.NoopSettings() + baseTelset.Logger = svc.Logger + baseTelset.Metrics = baseFactory + storageFactory.InitFromViper(v, logger) - if err := storageFactory.Initialize(baseFactory, logger); err != nil { + if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } spanWriter, err := storageFactory.CreateSpanWriter() diff --git a/cmd/ingester/main.go b/cmd/ingester/main.go index 39bb8c971a3..efc08cea61f 100644 --- a/cmd/ingester/main.go +++ b/cmd/ingester/main.go @@ -23,6 +23,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/internal/status" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/pkg/version" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" @@ -50,8 +51,12 @@ func main() { metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "ingester"}) version.NewInfoMetrics(metricsFactory) + baseTelset := telemetry.NoopSettings() + baseTelset.Logger = svc.Logger + baseTelset.Metrics = baseFactory + storageFactory.InitFromViper(v, logger) - if err := storageFactory.Initialize(baseFactory, logger); err != nil { + if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } spanWriter, err := storageFactory.CreateSpanWriter() diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 34ecbbc5823..6a9c2b22df4 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -9,7 +9,6 @@ import ( "fmt" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensioncapabilities" @@ -53,9 +52,29 @@ func (*server) Dependencies() []component.ID { } func (s *server) Start(ctx context.Context, host component.Host) error { - mf := otelmetrics.NewFactory(s.telset.MeterProvider) - baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"}) - queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"}) + // TODO OTel-collector does not initialize the tracer currently + // https://github.com/open-telemetry/opentelemetry-collector/issues/7532 + //nolint + tracerProvider, err := jtracer.New("jaeger") + if err != nil { + return fmt.Errorf("could not initialize a tracer: %w", err) + } + // make sure to close the tracer if subsequent code exists with error + success := false + defer func(ctx context.Context) { + if success { + s.closeTracer = tracerProvider.Close + } else { + tracerProvider.Close(ctx) + } + }(ctx) + + telset := telemetry.FromOtelComponent(s.telset, host) + telset.TracerProvider = tracerProvider.OTEL + telset.Metrics = telset.Metrics. + Namespace(metrics.NSOptions{Name: "jaeger"}). + Namespace(metrics.NSOptions{Name: "query"}) + f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host) if err != nil { return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err) @@ -66,7 +85,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { return fmt.Errorf("cannot create span reader: %w", err) } - spanReader = spanstoremetrics.NewReaderDecorator(spanReader, queryMetricsFactory) + spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics) depReader, err := f.CreateDependencyReader() if err != nil { @@ -86,25 +105,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error { tm := tenancy.NewManager(&s.config.Tenancy) - // TODO OTel-collector does not initialize the tracer currently - // https://github.com/open-telemetry/opentelemetry-collector/issues/7532 - //nolint - tracerProvider, err := jtracer.New("jaeger") - if err != nil { - return fmt.Errorf("could not initialize a tracer: %w", err) - } - s.closeTracer = tracerProvider.Close - telset := telemetry.Setting{ - Logger: s.telset.Logger, - TracerProvider: tracerProvider.OTEL, - Metrics: queryMetricsFactory, - ReportStatus: func(event *componentstatus.Event) { - componentstatus.ReportStatus(host, event) - }, - LeveledMeterProvider: s.telset.LeveledMeterProvider, - Host: host, - } - s.server, err = queryApp.NewServer( ctx, // TODO propagate healthcheck updates up to the collector's runtime @@ -122,6 +122,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { return fmt.Errorf("could not start jaeger-query: %w", err) } + success = true return nil } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 6fd95566e9f..54b3733f24c 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -17,9 +17,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -134,7 +133,7 @@ func TestServerStart(t *testing.T) { expectedErr string }{ { - name: "Non-empty config with fake storage host", + name: "Real server with non-empty config", config: &Config{ Storage: Storage{ TracesArchive: "jaeger_storage", @@ -204,15 +203,16 @@ func TestServerStart(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + // Despite using Noop Tracer below, query service also creates jtracer. + // We want to prevent that tracer from sampling anything in this test. + t.Setenv("OTEL_TRACES_SAMPLER", "always_off") telemetrySettings := component.TelemetrySettings{ - Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noopmetric.NewMeterProvider() - }, - MeterProvider: noopmetric.NewMeterProvider(), + Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())), + MeterProvider: noopmetric.NewMeterProvider(), + TracerProvider: nooptrace.NewTracerProvider(), } - tt.config.HTTP.Endpoint = ":0" - tt.config.GRPC.NetAddr.Endpoint = ":0" + tt.config.HTTP.Endpoint = "localhost:0" + tt.config.GRPC.NetAddr.Endpoint = "localhost:0" tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP server := newServer(tt.config, telemetrySettings) err := server.Start(context.Background(), host) @@ -297,7 +297,9 @@ func TestServerAddArchiveStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { logger, buf := testutils.NewLogger() telemetrySettings := component.TelemetrySettings{ - Logger: logger, + Logger: logger, + MeterProvider: noopmetric.NewMeterProvider(), + TracerProvider: nooptrace.NewTracerProvider(), } server := newServer(tt.config, telemetrySettings) if tt.extension != nil { @@ -347,7 +349,9 @@ func TestServerAddMetricsStorage(t *testing.T) { t.Run(tt.name, func(t *testing.T) { logger, buf := testutils.NewLogger() telemetrySettings := component.TelemetrySettings{ - Logger: logger, + Logger: logger, + MeterProvider: noopmetric.NewMeterProvider(), + TracerProvider: nooptrace.NewTracerProvider(), } server := newServer(tt.config, telemetrySettings) if tt.extension != nil { diff --git a/cmd/jaeger/internal/extension/jaegerstorage/extension.go b/cmd/jaeger/internal/extension/jaegerstorage/extension.go index a1f50de8ee2..e931b611de9 100644 --- a/cmd/jaeger/internal/extension/jaegerstorage/extension.go +++ b/cmd/jaeger/internal/extension/jaegerstorage/extension.go @@ -10,11 +10,8 @@ import ( "io" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/otel/metric" - "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/plugin/metrics/prometheus" @@ -118,34 +115,26 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE } func (s *storageExt) Start(_ context.Context, host component.Host) error { - baseFactory := otelmetrics.NewFactory(s.telset.MeterProvider) - mf := baseFactory.Namespace(metrics.NSOptions{Name: "jaeger"}) + telset := telemetry.FromOtelComponent(s.telset, host) + telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"}) for storageName, cfg := range s.config.TraceBackends { s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName) var factory storage.Factory var err error = errors.New("empty configuration") switch { case cfg.Memory != nil: - factory, err = memory.NewFactoryWithConfig(*cfg.Memory, mf, s.telset.Logger), nil + factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset.Metrics, s.telset.Logger), nil case cfg.Badger != nil: - factory, err = badger.NewFactoryWithConfig(*cfg.Badger, mf, s.telset.Logger) + factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset.Metrics, s.telset.Logger) case cfg.GRPC != nil: - telset := telemetry.Setting{ - Logger: s.telset.Logger, - Host: host, - Metrics: mf, - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return s.telset.MeterProvider - }, - } //nolint: contextcheck factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset) case cfg.Cassandra != nil: - factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger) + factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset.Metrics, s.telset.Logger) case cfg.Elasticsearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset.Metrics, s.telset.Logger) case cfg.Opensearch != nil: - factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger) + factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset.Metrics, s.telset.Logger) } if err != nil { return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err) diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index df40f502ff2..2e4d128742b 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -136,7 +136,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q } // BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config -func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions { +func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions { opts := &querysvc.QueryServiceOptions{} if !opts.InitArchiveStorage(storageFactory, logger) { logger.Info("Archive storage not initialized") diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index 135ecc60bbe..ee91b45bb58 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -127,7 +127,7 @@ func (qs QueryService) GetCapabilities() StorageCapabilities { } // InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them. -func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool { +func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool { archiveFactory, ok := storageFactory.(storage.ArchiveFactory) if !ok { logger.Info("Archive storage not supported by the factory") diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 3a40a428686..1047355acfd 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -48,7 +48,7 @@ type Server struct { httpServer *httpServer separatePorts bool bgFinished sync.WaitGroup - telemetry.Setting + telset telemetry.Settings } // NewServer creates and initializes Server @@ -58,7 +58,7 @@ func NewServer( metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, - telset telemetry.Setting, + telset telemetry.Settings, ) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTP.Endpoint) if err != nil { @@ -90,7 +90,7 @@ func NewServer( grpcServer: grpcServer, httpServer: httpServer, separatePorts: separatePorts, - Setting: telset, + telset: telset, }, nil } @@ -98,7 +98,7 @@ func registerGRPCHandlers( server *grpc.Server, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, - telset telemetry.Setting, + telset telemetry.Settings, ) { reflection.Register(server) handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{ @@ -121,7 +121,7 @@ func createGRPCServer( ctx context.Context, options *QueryOptions, tm *tenancy.Manager, - telset telemetry.Setting, + telset telemetry.Settings, ) (*grpc.Server, error) { var grpcOpts []configgrpc.ToServerOption unaryInterceptors := []grpc.UnaryServerInterceptor{ @@ -145,9 +145,9 @@ func createGRPCServer( ctx, telset.Host, component.TelemetrySettings{ - Logger: telset.Logger, - TracerProvider: telset.TracerProvider, - LeveledMeterProvider: telset.LeveledMeterProvider, + Logger: telset.Logger, + TracerProvider: telset.TracerProvider, + MeterProvider: telset.MeterProvider, }, grpcOpts...) } @@ -164,7 +164,7 @@ func initRouter( metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tenancyMgr *tenancy.Manager, - telset telemetry.Setting, + telset telemetry.Settings, ) (http.Handler, io.Closer) { apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(telset.Logger), @@ -206,7 +206,7 @@ func createHTTPServer( metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, - telset telemetry.Setting, + telset telemetry.Settings, ) (*httpServer, error) { handler, staticHandlerCloser := initRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset) handler = recoveryhandler.NewRecoveryHandler(telset.Logger, true)(handler) @@ -214,9 +214,9 @@ func createHTTPServer( ctx, telset.Host, component.TelemetrySettings{ - Logger: telset.Logger, - TracerProvider: telset.TracerProvider, - LeveledMeterProvider: telset.LeveledMeterProvider, + Logger: telset.Logger, + TracerProvider: telset.TracerProvider, + MeterProvider: telset.MeterProvider, }, handler, ) @@ -251,7 +251,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { if err != nil { return nil, err } - s.Logger.Info( + s.telset.Logger.Info( "Query server started", zap.String("http_addr", s.HTTPAddr()), zap.String("grpc_addr", s.GRPCAddr()), @@ -272,7 +272,7 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { tcpPort = port } - s.Logger.Info( + s.telset.Logger.Info( "Query server started", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) @@ -317,29 +317,29 @@ func (s *Server) Start(ctx context.Context) error { s.bgFinished.Add(1) go func() { defer s.bgFinished.Done() - s.Logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + s.telset.Logger.Info("Starting HTTP server", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) err := s.httpServer.Serve(s.httpConn) if err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) { - s.Logger.Error("Could not start HTTP server", zap.Error(err)) - s.ReportStatus(componentstatus.NewFatalErrorEvent(err)) + s.telset.Logger.Error("Could not start HTTP server", zap.Error(err)) + s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err)) return } - s.Logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + s.telset.Logger.Info("HTTP server stopped", zap.Int("port", httpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) }() // Start GRPC server concurrently s.bgFinished.Add(1) go func() { defer s.bgFinished.Done() - s.Logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint)) + s.telset.Logger.Info("Starting GRPC server", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint)) err := s.grpcServer.Serve(s.grpcConn) if err != nil && !errors.Is(err, cmux.ErrListenerClosed) && !errors.Is(err, cmux.ErrServerClosed) { - s.Logger.Error("Could not start GRPC server", zap.Error(err)) - s.ReportStatus(componentstatus.NewFatalErrorEvent(err)) + s.telset.Logger.Error("Could not start GRPC server", zap.Error(err)) + s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err)) return } - s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint)) + s.telset.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint)) }() // Start cmux server concurrently. @@ -347,16 +347,16 @@ func (s *Server) Start(ctx context.Context) error { s.bgFinished.Add(1) go func() { defer s.bgFinished.Done() - s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + s.telset.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) err := cmuxServer.Serve() // TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here. if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { - s.Logger.Error("Could not start multiplexed server", zap.Error(err)) - s.ReportStatus(componentstatus.NewFatalErrorEvent(err)) + s.telset.Logger.Error("Could not start multiplexed server", zap.Error(err)) + s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err)) return } - s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + s.telset.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) }() } return nil @@ -374,20 +374,20 @@ func (s *Server) GRPCAddr() string { func (s *Server) Close() error { var errs []error - s.Logger.Info("Closing HTTP server") + s.telset.Logger.Info("Closing HTTP server") if err := s.httpServer.Close(); err != nil { errs = append(errs, fmt.Errorf("failed to close HTTP server: %w", err)) } - s.Logger.Info("Stopping gRPC server") + s.telset.Logger.Info("Stopping gRPC server") s.grpcServer.Stop() if !s.separatePorts { - s.Logger.Info("Closing CMux server") + s.telset.Logger.Info("Closing CMux server") s.cmuxServer.Close() } s.bgFinished.Wait() - s.Logger.Info("Server stopped") + s.telset.Logger.Info("Server stopped") return errors.Join(errs...) } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 6c3a6ec918f..3f8e3749d46 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -16,14 +16,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/sdk/trace/tracetest" "go.uber.org/zap" @@ -49,16 +45,12 @@ import ( var testCertKeyLocation = "../../../pkg/config/tlscfg/testdata" -func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetry.Setting { - return telemetry.Setting{ - Logger: logger, - TracerProvider: tracerProvider.OTEL, - ReportStatus: telemetry.HCAdapter(hc), - Host: componenttest.NewNopHost(), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } +func initTelSet(logger *zap.Logger, tracerProvider *jtracer.JTracer, hc *healthcheck.HealthCheck) telemetry.Settings { + telset := telemetry.NoopSettings() + telset.Logger = logger + telset.TracerProvider = tracerProvider.OTEL + telset.ReportStatus = telemetry.HCAdapter(hc) + return telset } func TestServerError(t *testing.T) { diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 99b1ca2c030..73edd44cf2d 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -17,17 +17,12 @@ import ( "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/confignet" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap/zaptest" "github.com/jaegertracing/jaeger/cmd/internal/flags" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/pkg/config" - "github.com/jaegertracing/jaeger/pkg/jtracer" - "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/es" @@ -71,6 +66,10 @@ func runQueryService(t *testing.T, esURL string) *Server { flagsSvc := flags.NewService(ports.QueryAdminHTTP) flagsSvc.Logger = zaptest.NewLogger(t) + telset := telemetry.NoopSettings() + telset.Logger = flagsSvc.Logger + telset.ReportStatus = telemetry.HCAdapter(flagsSvc.HC()) + f := es.NewFactory() v, command := config.Viperize(f.AddFlags) require.NoError(t, command.ParseFlags([]string{ @@ -81,21 +80,13 @@ func runQueryService(t *testing.T, esURL string) *Server { f.InitFromViper(v, flagsSvc.Logger) // set AllowTokenFromContext manually because we don't register the respective CLI flag from query svc f.Options.Primary.Authentication.BearerTokenAuthentication.AllowFromContext = true - require.NoError(t, f.Initialize(metrics.NullFactory, flagsSvc.Logger)) + require.NoError(t, f.Initialize(telset.Metrics, telset.Logger)) defer f.Close() spanReader, err := f.CreateSpanReader() require.NoError(t, err) querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) - telset := telemetry.Setting{ - Logger: flagsSvc.Logger, - TracerProvider: jtracer.NoOp().OTEL, - ReportStatus: telemetry.HCAdapter(flagsSvc.HC()), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } server, err := NewServer(context.Background(), querySvc, nil, &QueryOptions{ BearerTokenPropagation: true, diff --git a/cmd/query/main.go b/cmd/query/main.go index 0ffc57f263b..0376b7a60ee 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -12,9 +12,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -81,10 +78,17 @@ func main() { } } + baseTelset := telemetry.Settings{ + Logger: logger, + Metrics: baseFactory, + TracerProvider: jt.OTEL, + ReportStatus: telemetry.HCAdapter(svc.HC()), + } + // TODO: Need to figure out set enable/disable propagation on storage plugins. v.Set(bearertoken.StoragePropagationKey, queryOpts.BearerTokenPropagation) storageFactory.InitFromViper(v, logger) - if err := storageFactory.Initialize(baseFactory, logger); err != nil { + if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } spanReader, err := storageFactory.CreateSpanReader() @@ -107,15 +111,16 @@ func main() { dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy) - telset := telemetry.Setting{ - Logger: logger, - TracerProvider: jt.OTEL, - ReportStatus: telemetry.HCAdapter(svc.HC()), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } - server, err := app.NewServer(context.Background(), queryService, metricsQueryService, queryOpts, tm, telset) + telset := baseTelset // copy + telset.Metrics = metricsFactory + server, err := app.NewServer( + context.Background(), + queryService, + metricsQueryService, + queryOpts, + tm, + telset, + ) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/cmd/remote-storage/app/server.go b/cmd/remote-storage/app/server.go index a6eb15a05df..546d5c0b16c 100644 --- a/cmd/remote-storage/app/server.go +++ b/cmd/remote-storage/app/server.go @@ -32,11 +32,11 @@ type Server struct { grpcConn net.Listener grpcServer *grpc.Server wg sync.WaitGroup - telemetry.Setting + telset telemetry.Settings } // NewServer creates and initializes Server. -func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Manager, telset telemetry.Setting) (*Server, error) { +func NewServer(options *Options, storageFactory storage.BaseFactory, tm *tenancy.Manager, telset telemetry.Settings) (*Server, error) { handler, err := createGRPCHandler(storageFactory, telset.Logger) if err != nil { return nil, err @@ -50,11 +50,11 @@ func NewServer(options *Options, storageFactory storage.Factory, tm *tenancy.Man return &Server{ opts: options, grpcServer: grpcServer, - Setting: telset, + telset: telset, }, nil } -func createGRPCHandler(f storage.Factory, logger *zap.Logger) (*shared.GRPCHandler, error) { +func createGRPCHandler(f storage.BaseFactory, logger *zap.Logger) (*shared.GRPCHandler, error) { reader, err := f.CreateSpanReader() if err != nil { return nil, err @@ -128,14 +128,14 @@ func (s *Server) Start() error { if err != nil { return err } - s.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr())) + s.telset.Logger.Info("Starting GRPC server", zap.Stringer("addr", listener.Addr())) s.grpcConn = listener s.wg.Add(1) go func() { defer s.wg.Done() if err := s.grpcServer.Serve(s.grpcConn); err != nil { - s.Logger.Error("GRPC server exited", zap.Error(err)) - s.ReportStatus(componentstatus.NewFatalErrorEvent(err)) + s.telset.Logger.Error("GRPC server exited", zap.Error(err)) + s.telset.ReportStatus(componentstatus.NewFatalErrorEvent(err)) } }() @@ -148,6 +148,6 @@ func (s *Server) Close() error { s.grpcConn.Close() s.opts.TLSGRPC.Close() s.wg.Wait() - s.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped)) + s.telset.ReportStatus(componentstatus.NewEvent(componentstatus.StatusStopped)) return nil } diff --git a/cmd/remote-storage/app/server_test.go b/cmd/remote-storage/app/server_test.go index 71a319fcb0d..c9a78d31896 100644 --- a/cmd/remote-storage/app/server_test.go +++ b/cmd/remote-storage/app/server_test.go @@ -12,10 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "go.uber.org/zap/zaptest/observer" "google.golang.org/grpc" @@ -45,19 +41,12 @@ func TestNewServer_CreateStorageErrors(t *testing.T) { factory.On("CreateSpanWriter").Return(nil, nil) factory.On("CreateDependencyReader").Return(nil, errors.New("no deps")).Once() factory.On("CreateDependencyReader").Return(nil, nil) - telset := telemetry.Setting{ - Logger: zap.NewNop(), - ReportStatus: func(*componentstatus.Event) {}, - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } f := func() (*Server, error) { return NewServer( &Options{GRPCHostPort: ":0"}, factory, tenancy.NewManager(&tenancy.Options{}), - telset, + telemetry.NoopSettings(), ) } _, err := f() @@ -119,7 +108,7 @@ func TestNewServer_TLSConfigError(t *testing.T) { KeyPath: "invalid/path", ClientCAPath: "invalid/path", } - telset := telemetry.Setting{ + telset := telemetry.Settings{ Logger: zap.NewNop(), ReportStatus: telemetry.HCAdapter(healthcheck.New()), } @@ -327,7 +316,7 @@ func TestServerGRPCTLS(t *testing.T) { storageMocks.reader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) tm := tenancy.NewManager(&tenancy.Options{Enabled: true}) - telset := telemetry.Setting{ + telset := telemetry.Settings{ Logger: flagsSvc.Logger, ReportStatus: telemetry.HCAdapter(flagsSvc.HC()), } @@ -376,7 +365,7 @@ func TestServerHandlesPortZero(t *testing.T) { zapCore, logs := observer.New(zap.InfoLevel) flagsSvc.Logger = zap.New(zapCore) storageMocks := newStorageMocks() - telset := telemetry.Setting{ + telset := telemetry.Settings{ Logger: flagsSvc.Logger, ReportStatus: telemetry.HCAdapter(flagsSvc.HC()), } diff --git a/cmd/remote-storage/main.go b/cmd/remote-storage/main.go index 8877b92fdd8..8f1ed50e9a7 100644 --- a/cmd/remote-storage/main.go +++ b/cmd/remote-storage/main.go @@ -10,8 +10,6 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" _ "go.uber.org/automaxprocs" "go.uber.org/zap" @@ -64,19 +62,21 @@ func main() { logger.Fatal("Failed to parse options", zap.Error(err)) } + baseTelset := telemetry.Settings{ + Logger: svc.Logger, + Metrics: baseFactory, + ReportStatus: telemetry.HCAdapter(svc.HC()), + MeterProvider: noop.NewMeterProvider(), // TODO + } + storageFactory.InitFromViper(v, logger) - if err := storageFactory.Initialize(baseFactory, logger); err != nil { + if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } tm := tenancy.NewManager(&opts.Tenancy) - telset := telemetry.Setting{ - Logger: svc.Logger, - ReportStatus: telemetry.HCAdapter(svc.HC()), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } + telset := baseTelset // copy + telset.Metrics = metricsFactory server, err := app.NewServer(opts, storageFactory, tm, telset) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) diff --git a/crossdock/main.go b/crossdock/main.go index 686f6ff515d..1ca1ca428f1 100644 --- a/crossdock/main.go +++ b/crossdock/main.go @@ -98,7 +98,9 @@ func is2xxStatusCode(statusCode int) bool { func httpHealthCheck(logger *zap.Logger, service, healthURL string) { for i := 0; i < 240; i++ { res, err := http.Get(healthURL) - res.Body.Close() + if err == nil { + res.Body.Close() + } if err == nil && is2xxStatusCode(res.StatusCode) { logger.Info("Health check successful", zap.String("service", service)) return diff --git a/pkg/telemetry/settings.go b/pkg/telemetry/settings.go index b9204c9bff1..1f76b38e107 100644 --- a/pkg/telemetry/settings.go +++ b/pkg/telemetry/settings.go @@ -6,22 +6,25 @@ package telemetry import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" - "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/otel/metric" + noopmetric "go.opentelemetry.io/otel/metric/noop" "go.opentelemetry.io/otel/trace" + nooptrace "go.opentelemetry.io/otel/trace/noop" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" ) -type Setting struct { - Logger *zap.Logger - Metrics metrics.Factory - LeveledMeterProvider func(configtelemetry.Level) metric.MeterProvider - TracerProvider trace.TracerProvider - ReportStatus func(*componentstatus.Event) - Host component.Host +type Settings struct { + Logger *zap.Logger + Metrics metrics.Factory + MeterProvider metric.MeterProvider + TracerProvider trace.TracerProvider + ReportStatus func(*componentstatus.Event) // TODO remove this + Host component.Host } func HCAdapter(hc *healthcheck.HealthCheck) func(*componentstatus.Event) { @@ -43,3 +46,27 @@ func HCAdapter(hc *healthcheck.HealthCheck) func(*componentstatus.Event) { hc.Set(hcStatus) } } + +func NoopSettings() Settings { + return Settings{ + Logger: zap.NewNop(), + Metrics: metrics.NullFactory, + MeterProvider: noopmetric.NewMeterProvider(), + TracerProvider: nooptrace.NewTracerProvider(), + ReportStatus: func(*componentstatus.Event) {}, + Host: componenttest.NewNopHost(), + } +} + +func FromOtelComponent(telset component.TelemetrySettings, host component.Host) Settings { + return Settings{ + Logger: telset.Logger, + Metrics: otelmetrics.NewFactory(telset.MeterProvider), + MeterProvider: telset.MeterProvider, + TracerProvider: telset.TracerProvider, + ReportStatus: func(event *componentstatus.Event) { + componentstatus.ReportStatus(host, event) + }, + Host: host, + } +} diff --git a/pkg/telemetry/settings_test.go b/pkg/telemetry/settings_test.go index 9b1a5cce764..718717772d6 100644 --- a/pkg/telemetry/settings_test.go +++ b/pkg/telemetry/settings_test.go @@ -4,10 +4,16 @@ package telemetry_test import ( + "errors" "testing" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" + "go.opentelemetry.io/collector/component/componenttest" + noopmetric "go.opentelemetry.io/otel/metric/noop" + nooptrace "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/zap" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/telemetry" @@ -73,6 +79,33 @@ func TestHCAdapter(t *testing.T) { } } +func TestNoopSettingss(t *testing.T) { + telset := telemetry.NoopSettings() + assert.NotNil(t, telset.Logger) + assert.NotNil(t, telset.Metrics) + assert.NotNil(t, telset.MeterProvider) + assert.NotNil(t, telset.TracerProvider) + assert.NotNil(t, telset.ReportStatus) + assert.NotNil(t, telset.Host) + telset.ReportStatus(componentstatus.NewFatalErrorEvent(errors.New("foobar"))) +} + +func TestFromOtelComponent(t *testing.T) { + otelTelset := component.TelemetrySettings{ + Logger: zap.NewNop(), + MeterProvider: noopmetric.NewMeterProvider(), + TracerProvider: nooptrace.NewTracerProvider(), + } + host := componenttest.NewNopHost() + telset := telemetry.FromOtelComponent(otelTelset, host) + assert.Equal(t, otelTelset.Logger, telset.Logger) + assert.Equal(t, otelTelset.MeterProvider, telset.MeterProvider) + assert.Equal(t, otelTelset.TracerProvider, telset.TracerProvider) + assert.Equal(t, host, telset.Host) + assert.NotNil(t, telset.ReportStatus) + telset.ReportStatus(componentstatus.NewFatalErrorEvent(errors.New("foobar"))) +} + func TestMain(m *testing.M) { testutils.VerifyGoLeaks(m) } diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index c38957bead0..e198068a7cb 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -12,7 +12,6 @@ import ( "github.com/spf13/viper" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -43,33 +42,28 @@ var ( // interface comformance checks // Factory implements storage.Factory and creates storage components backed by a storage plugin. type Factory struct { - metricsFactory metrics.Factory - logger *zap.Logger - tracerProvider trace.TracerProvider config Config + telset telemetry.Settings services *ClientPluginServices tracedRemoteConn *grpc.ClientConn untracedRemoteConn *grpc.ClientConn - host component.Host - meterProvider metric.MeterProvider } // NewFactory creates a new Factory. func NewFactory() *Factory { return &Factory{ - host: componenttest.NewNopHost(), + telset: telemetry.NoopSettings(), } } // NewFactoryWithConfig is used from jaeger(v2). func NewFactoryWithConfig( cfg Config, - telset telemetry.Setting, + telset telemetry.Settings, ) (*Factory, error) { f := NewFactory() f.config = cfg - f.host = telset.Host - f.meterProvider = telset.LeveledMeterProvider(configtelemetry.LevelNone) + f.telset = telset if err := f.Initialize(telset.Metrics, telset.Logger); err != nil { return nil, err } @@ -90,17 +84,18 @@ func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) { // Initialize implements storage.Factory func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { - f.metricsFactory, f.logger = metricsFactory, logger - f.tracerProvider = otel.GetTracerProvider() + f.telset.Metrics = metricsFactory + f.telset.Logger = logger + f.telset.TracerProvider = otel.GetTracerProvider() - tracedTelset := getTelset(logger, f.tracerProvider, f.meterProvider) - untracedTelset := getTelset(logger, noop.NewTracerProvider(), f.meterProvider) + tracedTelset := getTelset(logger, f.telset.TracerProvider, f.telset.MeterProvider) + untracedTelset := getTelset(logger, noop.NewTracerProvider(), f.telset.MeterProvider) newClientFn := func(telset component.TelemetrySettings, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { clientOpts := make([]configgrpc.ToClientConnOption, 0) for _, opt := range opts { clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt)) } - return f.config.ToClientConn(context.Background(), f.host, telset, clientOpts...) + return f.config.ToClientConn(context.Background(), f.telset.Host, telset, clientOpts...) } var err error diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go index 42f2559cec3..c96897bae16 100644 --- a/plugin/storage/grpc/factory_test.go +++ b/plugin/storage/grpc/factory_test.go @@ -16,13 +16,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configauth" "go.opentelemetry.io/collector/config/configgrpc" - "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "google.golang.org/grpc" @@ -110,14 +106,7 @@ func TestNewFactoryError(t *testing.T) { Auth: &configauth.Authentication{}, }, } - telset := telemetry.Setting{ - Logger: zap.NewNop(), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - Host: componenttest.NewNopHost(), - Metrics: metrics.NullFactory, - } + telset := telemetry.NoopSettings() t.Run("with_config", func(t *testing.T) { _, err := NewFactoryWithConfig(*cfg, telset) assert.ErrorContains(t, err, "authenticator") @@ -188,14 +177,7 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) { Enabled: true, }, } - telset := telemetry.Setting{ - Logger: zap.NewNop(), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - Host: componenttest.NewNopHost(), - Metrics: metrics.NullFactory, - } + telset := telemetry.NoopSettings() f, err := NewFactoryWithConfig(cfg, telset) require.NoError(t, err) require.NoError(t, f.Close()) diff --git a/plugin/storage/integration/remote_memory_storage.go b/plugin/storage/integration/remote_memory_storage.go index a27c28bac02..74f813fa2b9 100644 --- a/plugin/storage/integration/remote_memory_storage.go +++ b/plugin/storage/integration/remote_memory_storage.go @@ -10,9 +10,6 @@ import ( "time" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" "go.uber.org/zap/zaptest" "google.golang.org/grpc" @@ -51,13 +48,9 @@ func StartNewRemoteMemoryStorage(t *testing.T) *RemoteMemoryStorage { require.NoError(t, storageFactory.Initialize(metrics.NullFactory, logger)) t.Logf("Starting in-process remote storage server on %s", opts.GRPCHostPort) - telset := telemetry.Setting{ - Logger: logger, - ReportStatus: telemetry.HCAdapter(healthcheck.New()), - LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider { - return noop.NewMeterProvider() - }, - } + telset := telemetry.NoopSettings() + telset.Logger = logger + telset.ReportStatus = telemetry.HCAdapter(healthcheck.New()) server, err := app.NewServer(opts, storageFactory, tm, telset) require.NoError(t, err) require.NoError(t, server.Start()) diff --git a/storage/factory.go b/storage/factory.go index f3e8ea7f4af..ca4afe34259 100644 --- a/storage/factory.go +++ b/storage/factory.go @@ -18,6 +18,19 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +// BaseFactory is the same as Factory, but without the Initialize method. +// It was a design mistake originally to add Initialize to the Factory interface. +type BaseFactory interface { + // CreateSpanReader creates a spanstore.Reader. + CreateSpanReader() (spanstore.Reader, error) + + // CreateSpanWriter creates a spanstore.Writer. + CreateSpanWriter() (spanstore.Writer, error) + + // CreateDependencyReader creates a dependencystore.Reader. + CreateDependencyReader() (dependencystore.Reader, error) +} + // Factory defines an interface for a factory that can create implementations of different storage components. // Implementations are also encouraged to implement plugin.Configurable interface. // @@ -25,18 +38,10 @@ import ( // // plugin.Configurable type Factory interface { + BaseFactory // Initialize performs internal initialization of the factory, such as opening connections to the backend store. // It is called after all configuration of the factory itself has been done. Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error - - // CreateSpanReader creates a spanstore.Reader. - CreateSpanReader() (spanstore.Reader, error) - - // CreateSpanWriter creates a spanstore.Writer. - CreateSpanWriter() (spanstore.Writer, error) - - // CreateDependencyReader creates a dependencystore.Reader. - CreateDependencyReader() (dependencystore.Reader, error) } // Purger defines an interface that is capable of purging the storage. diff --git a/storage/mocks/BaseFactory.go b/storage/mocks/BaseFactory.go new file mode 100644 index 00000000000..69e391526f7 --- /dev/null +++ b/storage/mocks/BaseFactory.go @@ -0,0 +1,124 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + dependencystore "github.com/jaegertracing/jaeger/storage/dependencystore" + mock "github.com/stretchr/testify/mock" + + spanstore "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// BaseFactory is an autogenerated mock type for the BaseFactory type +type BaseFactory struct { + mock.Mock +} + +// CreateDependencyReader provides a mock function with given fields: +func (_m *BaseFactory) CreateDependencyReader() (dependencystore.Reader, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateDependencyReader") + } + + var r0 dependencystore.Reader + var r1 error + if rf, ok := ret.Get(0).(func() (dependencystore.Reader, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() dependencystore.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(dependencystore.Reader) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateSpanReader provides a mock function with given fields: +func (_m *BaseFactory) CreateSpanReader() (spanstore.Reader, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateSpanReader") + } + + var r0 spanstore.Reader + var r1 error + if rf, ok := ret.Get(0).(func() (spanstore.Reader, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() spanstore.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spanstore.Reader) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateSpanWriter provides a mock function with given fields: +func (_m *BaseFactory) CreateSpanWriter() (spanstore.Writer, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateSpanWriter") + } + + var r0 spanstore.Writer + var r1 error + if rf, ok := ret.Get(0).(func() (spanstore.Writer, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() spanstore.Writer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(spanstore.Writer) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewBaseFactory creates a new instance of BaseFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewBaseFactory(t interface { + mock.TestingT + Cleanup(func()) +}) *BaseFactory { + mock := &BaseFactory{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}