diff --git a/pkg/es/config/config.go b/pkg/es/config/config.go index 3e10445ce4a..d0ae9c6ad3d 100644 --- a/pkg/es/config/config.go +++ b/pkg/es/config/config.go @@ -135,9 +135,7 @@ type Configuration struct { AdaptiveSamplingLookback time.Duration `mapstructure:"adaptive_sampling_lookback"` Tags TagsAsFields `mapstructure:"tags_as_fields"` // Enabled, if set to true, enables the namespace for storage pointed to by this configuration. - Enabled bool `mapstructure:"-"` - transport *http.Transport - transportV8 *http.Transport + Enabled bool `mapstructure:"-"` } // TagsAsFields holds configuration for tag schema. @@ -304,7 +302,7 @@ func NewClient(c *Configuration, logger *zap.Logger, metricsFactory metrics.Fact } } - return eswrapper.WrapESClient(rawClient, c.transport, bulkProc, c.Version, rawClientV8, c.transportV8), nil + return eswrapper.WrapESClient(rawClient, bulkProc, c.Version, rawClientV8), nil } func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, error) { @@ -313,12 +311,11 @@ func newElasticsearchV8(c *Configuration, logger *zap.Logger) (*esV8.Client, err options.Username = c.Authentication.BasicAuthentication.Username options.Password = c.Authentication.BasicAuthentication.Password options.DiscoverNodesOnStart = c.Sniffing.Enabled - transport, transportV8, err := GetHTTPRoundTripper(c, logger) + transport, err := GetHTTPRoundTripper(c, logger) if err != nil { return nil, err } options.Transport = transport - c.transportV8 = transportV8 return esV8.NewClient(options) } @@ -486,11 +483,11 @@ func (c *Configuration) getConfigOptions(logger *zap.Logger) ([]elastic.ClientOp if err != nil { return options, err } - transport, httpTransport, err := GetHTTPRoundTripper(c, logger) + + transport, err := GetHTTPRoundTripper(c, logger) if err != nil { return nil, err } - c.transport = httpTransport httpClient.Transport = transport return options, nil } @@ -529,17 +526,16 @@ func addLoggerOptions(options []elastic.ClientOptionFunc, logLevel string, logge } // GetHTTPRoundTripper returns configured http.RoundTripper -func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTripper, *http.Transport, error) { +func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTripper, error) { if !c.TLS.Insecure { ctlsConfig, err := c.TLS.LoadTLSConfig(context.Background()) if err != nil { - return nil, nil, err + return nil, err } - httpTransport := &http.Transport{ + return &http.Transport{ Proxy: http.ProxyFromEnvironment, TLSClientConfig: ctlsConfig, - } - return c.transport, httpTransport, nil + }, nil } var transport http.RoundTripper httpTransport := &http.Transport{ @@ -550,7 +546,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe if c.TLS.CAFile != "" { ctlsConfig, err := c.TLS.LoadTLSConfig(context.Background()) if err != nil { - return nil, nil, err + return nil, err } httpTransport.TLSClientConfig = ctlsConfig transport = httpTransport @@ -563,7 +559,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe } tokenFromFile, err := loadTokenFromFile(c.Authentication.BearerTokenAuthentication.FilePath) if err != nil { - return nil, nil, err + return nil, err } token = tokenFromFile } @@ -574,7 +570,7 @@ func GetHTTPRoundTripper(c *Configuration, logger *zap.Logger) (http.RoundTrippe StaticToken: token, } } - return transport, httpTransport, nil + return transport, nil } func loadTokenFromFile(path string) (string, error) { diff --git a/pkg/es/wrapper/wrapper.go b/pkg/es/wrapper/wrapper.go index 3faffde2d02..e34b8c49590 100644 --- a/pkg/es/wrapper/wrapper.go +++ b/pkg/es/wrapper/wrapper.go @@ -22,11 +22,9 @@ import ( // ClientWrapper is a wrapper around elastic.Client type ClientWrapper struct { client *elastic.Client - transport *http.Transport bulkService *elastic.BulkProcessor esVersion uint clientV8 *esV8.Client - transportV8 *http.Transport } // GetVersion returns the ElasticSearch Version @@ -35,14 +33,12 @@ func (c ClientWrapper) GetVersion() uint { } // WrapESClient creates a ESClient out of *elastic.Client. -func WrapESClient(client *elastic.Client, transport *http.Transport, s *elastic.BulkProcessor, esVersion uint, clientV8 *esV8.Client, transportV8 *http.Transport) ClientWrapper { +func WrapESClient(client *elastic.Client, s *elastic.BulkProcessor, esVersion uint, clientV8 *esV8.Client) ClientWrapper { return ClientWrapper{ client: client, - transport: transport, bulkService: s, esVersion: esVersion, clientV8: clientV8, - transportV8: transportV8, } } @@ -99,10 +95,6 @@ func (c ClientWrapper) MultiSearch() es.MultiSearchService { // Close closes ESClient and flushes all data to the storage. func (c ClientWrapper) Close() error { c.client.Stop() - c.transport.CloseIdleConnections() - if c.transportV8 != nil { - c.transportV8.CloseIdleConnections() - } return c.bulkService.Close() } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index f497fe8dea3..b9f2043acc9 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -23,6 +23,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/storage/dependencystore" ) @@ -176,6 +177,9 @@ func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { } func TestElasticsearchStorage(t *testing.T) { + t.Cleanup(func() { + testutils.VerifyGoLeaksOnce(t) + }) testElasticsearchStorage(t, false) }