From 15f14aa5f781eb375a20e152310a81364f73dee9 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Sun, 27 Oct 2024 23:56:56 +0200 Subject: [PATCH] Fixing deprecated (#773) ## Which problem is this PR solving? ## Short description of the changes --- .golangci.yml | 9 +- client/grpc/grpc.go | 12 +-- client/grpc/grpc_test.go | 144 +---------------------------- client/grpc/integration_test.go | 146 ++++++++++++++++++++++++++++++ client/kafka/kafka.go | 2 +- client/sns/integration_test.go | 17 +--- client/sns/sns.go | 4 +- client/sqs/integration_test.go | 17 +--- client/sqs/sqs.go | 4 +- component/amqp/component.go | 2 +- component/grpc/component_test.go | 9 +- component/kafka/component.go | 1 - component/sqs/component.go | 2 +- component/sqs/integration_test.go | 36 ++------ component/sqs/metric.go | 6 +- examples/client/main.go | 9 +- examples/examples.go | 2 +- 17 files changed, 192 insertions(+), 230 deletions(-) create mode 100644 client/grpc/integration_test.go diff --git a/.golangci.yml b/.golangci.yml index 7e830a1d7..fecdae0b3 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -59,7 +59,6 @@ linters: - makezero - nilnil - reassign - - sloglint - spancheck - testifylint - wastedassign @@ -74,16 +73,10 @@ linters: - dogsled - protogetter - usestdlibvars - - testableexamples + - testableexamples fast: false issues: exclude-dirs: - vendor max-same-issues: 10 - - exclude-rules: - # Exclude some staticcheck messages - - linters: - - staticcheck - text: "SA1019:" diff --git a/client/grpc/grpc.go b/client/grpc/grpc.go index 1ffda9fee..559cad9ec 100644 --- a/client/grpc/grpc.go +++ b/client/grpc/grpc.go @@ -2,26 +2,18 @@ package grpc import ( - "context" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" ) -// Dial creates a client connection to the given target with a tracing and -// metrics unary interceptor. -func Dial(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { - return DialContext(context.Background(), target, opts...) -} - // DialContext creates a client connection to the given target with a context and // a tracing and metrics unary interceptor. -func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func NewClient(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { if len(opts) == 0 { opts = make([]grpc.DialOption, 0) } opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler())) - return grpc.DialContext(ctx, target, opts...) + return grpc.NewClient(target, opts...) } diff --git a/client/grpc/grpc_test.go b/client/grpc/grpc_test.go index d8450fed2..4666e2924 100644 --- a/client/grpc/grpc_test.go +++ b/client/grpc/grpc_test.go @@ -1,78 +1,19 @@ package grpc import ( - "context" - "fmt" - "log" - "net" - "os" "testing" - "github.com/beatlabs/patron/examples" - "github.com/beatlabs/patron/internal/test" - "github.com/beatlabs/patron/observability/trace" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/trace/tracetest" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/status" - "google.golang.org/grpc/test/bufconn" ) const ( - bufSize = 1024 * 1024 - target = "bufnet" + target = "target" ) -var lis *bufconn.Listener - -type server struct { - examples.UnimplementedGreeterServer -} - -func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error { - return status.Error(codes.Unavailable, "streaming not supported") -} - -func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) { - if req.GetFirstname() == "" { - return nil, status.Error(codes.InvalidArgument, "first name cannot be empty") - } - return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil -} - -func TestMain(m *testing.M) { - lis = bufconn.Listen(bufSize) - s := grpc.NewServer() - examples.RegisterGreeterServer(s, &server{}) - go func() { - if err := s.Serve(lis); err != nil { - log.Fatal(err) - } - }() - - code := m.Run() - - s.GracefulStop() - - os.Exit(code) -} - -func bufDialer(_ context.Context, _ string) (net.Conn, error) { - return lis.Dial() -} - -func TestDial(t *testing.T) { - conn, err := Dial(target, grpc.WithContextDialer(bufDialer), grpc.WithInsecure()) - require.NoError(t, err) - assert.NotNil(t, conn) - require.NoError(t, conn.Close()) -} - -func TestDialContext(t *testing.T) { +func TestNewClient(t *testing.T) { t.Parallel() type args struct { opts []grpc.DialOption @@ -83,7 +24,7 @@ func TestDialContext(t *testing.T) { }{ "success": { args: args{ - opts: []grpc.DialOption{grpc.WithContextDialer(bufDialer), grpc.WithInsecure()}, + opts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, }, }, "failure missing grpc.WithInsecure()": { @@ -94,7 +35,7 @@ func TestDialContext(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { t.Parallel() - gotConn, err := DialContext(context.Background(), target, tt.args.opts...) + gotConn, err := NewClient(target, tt.args.opts...) if tt.expectedErr != "" { require.EqualError(t, err, tt.expectedErr) assert.Nil(t, gotConn) @@ -105,80 +46,3 @@ func TestDialContext(t *testing.T) { }) } } - -func TestSayHello(t *testing.T) { - ctx := context.Background() - conn, err := DialContext(ctx, target, grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials())) - require.NoError(t, err) - defer func() { - require.NoError(t, conn.Close()) - }() - - // Tracing setup - exp := tracetest.NewInMemoryExporter() - tracePublisher := trace.Setup("test", nil, exp) - - // Metrics monitoring set up - shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t) - defer shutdownProvider() - - client := examples.NewGreeterClient(conn) - - tt := map[string]struct { - req *examples.HelloRequest - wantErr bool - wantCode codes.Code - wantMsg string - wantCounter int - }{ - "ok": { - req: &examples.HelloRequest{Firstname: "John"}, - wantErr: false, - wantCode: codes.OK, - wantMsg: "Hello John!", - wantCounter: 1, - }, - "invalid": { - req: &examples.HelloRequest{}, - wantErr: true, - wantCode: codes.InvalidArgument, - wantMsg: "first name cannot be empty", - wantCounter: 1, - }, - } - - for n, tt := range tt { - t.Run(n, func(t *testing.T) { - t.Cleanup(func() { exp.Reset() }) - - res, err := client.SayHello(ctx, tt.req) - if tt.wantErr { - require.Nil(t, res) - require.Error(t, err) - - rpcStatus, ok := status.FromError(err) - require.True(t, ok) - require.Equal(t, tt.wantCode, rpcStatus.Code()) - require.Equal(t, tt.wantMsg, rpcStatus.Message()) - } else { - require.NoError(t, err) - require.NotNil(t, res) - require.Equal(t, tt.wantMsg, res.GetMessage()) - } - - require.NoError(t, tracePublisher.ForceFlush(context.Background())) - - snaps := exp.GetSpans().Snapshots() - - assert.Len(t, snaps, 1) - assert.Equal(t, "examples.Greeter/SayHello", snaps[0].Name()) - assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), snaps[0].Attributes()[0]) - assert.Equal(t, attribute.String("rpc.method", "SayHello"), snaps[0].Attributes()[1]) - assert.Equal(t, attribute.String("rpc.system", "grpc"), snaps[0].Attributes()[2]) - assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), snaps[0].Attributes()[3]) - - // Metrics - _ = assertCollectMetrics(4) - }) - } -} diff --git a/client/grpc/integration_test.go b/client/grpc/integration_test.go new file mode 100644 index 000000000..9cf93abdd --- /dev/null +++ b/client/grpc/integration_test.go @@ -0,0 +1,146 @@ +//go:build integration + +package grpc + +import ( + "context" + "fmt" + "log" + "net" + "testing" + + "github.com/beatlabs/patron/examples" + "github.com/beatlabs/patron/internal/test" + "github.com/beatlabs/patron/observability/trace" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/trace/tracetest" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" +) + +type server struct { + examples.UnimplementedGreeterServer +} + +func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error { + return status.Error(codes.Unavailable, "streaming not supported") +} + +func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) { + if req.GetFirstname() == "" { + return nil, status.Error(codes.InvalidArgument, "first name cannot be empty") + } + return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil +} + +func TestSayHello(t *testing.T) { + ctx := context.Background() + + client, closer, err := testServer() + require.NoError(t, err) + defer closer() + + // Tracing setup + exp := tracetest.NewInMemoryExporter() + tracePublisher := trace.Setup("test", nil, exp) + + // Metrics monitoring set up + shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t) + defer shutdownProvider() + + tt := map[string]struct { + req *examples.HelloRequest + wantErr bool + wantCode codes.Code + wantMsg string + wantCounter int + }{ + "ok": { + req: &examples.HelloRequest{Firstname: "John"}, + wantErr: false, + wantCode: codes.OK, + wantMsg: "Hello John!", + wantCounter: 1, + }, + "invalid": { + req: &examples.HelloRequest{}, + wantErr: true, + wantCode: codes.InvalidArgument, + wantMsg: "first name cannot be empty", + wantCounter: 1, + }, + } + + for n, tt := range tt { + t.Run(n, func(t *testing.T) { + t.Cleanup(func() { exp.Reset() }) + + res, err := client.SayHello(ctx, tt.req) + if tt.wantErr { + require.Nil(t, res) + require.Error(t, err) + + rpcStatus, ok := status.FromError(err) + require.True(t, ok) + require.Equal(t, tt.wantCode, rpcStatus.Code()) + require.Equal(t, tt.wantMsg, rpcStatus.Message()) + } else { + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, tt.wantMsg, res.GetMessage()) + } + + require.NoError(t, tracePublisher.ForceFlush(context.Background())) + + spans := exp.GetSpans().Snapshots() + + assert.Len(t, spans, 1) + assert.Equal(t, "examples.Greeter/SayHello", spans[0].Name()) + assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), spans[0].Attributes()[0]) + assert.Equal(t, attribute.String("rpc.method", "SayHello"), spans[0].Attributes()[1]) + assert.Equal(t, attribute.String("rpc.system", "grpc"), spans[0].Attributes()[2]) + assert.Equal(t, attribute.String("net.sock.peer.addr", "127.0.0.1"), spans[0].Attributes()[3]) + // assert.Equal(t, attribute.Int64("net.sock.peer.port", 0), spans[0].Attributes()[4]) + assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), spans[0].Attributes()[5]) + + // Metrics + _ = assertCollectMetrics(4) + }) + } +} + +func testServer() (examples.GreeterClient, func(), error) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + return nil, nil, err + } + + baseServer := grpc.NewServer() + examples.RegisterGreeterServer(baseServer, &server{}) + go func() { + if err := baseServer.Serve(lis); err != nil { + log.Printf("error serving server: %v", err) + } + }() + + closer := func() { + err := lis.Close() + if err != nil { + log.Printf("error closing listener: %v", err) + } + baseServer.Stop() + } + + conn, err := NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + defer closer() + return nil, nil, err + } + + client := examples.NewGreeterClient(conn) + return client, closer, nil +} diff --git a/client/kafka/kafka.go b/client/kafka/kafka.go index 61bea87ee..b963bb27d 100644 --- a/client/kafka/kafka.go +++ b/client/kafka/kafka.go @@ -116,7 +116,7 @@ func (b *Builder) Create() (*SyncProducer, error) { } // CreateAsync a new asynchronous producer. -func (b Builder) CreateAsync() (*AsyncProducer, <-chan error, error) { +func (b *Builder) CreateAsync() (*AsyncProducer, <-chan error, error) { if len(b.errs) > 0 { return nil, nil, errors.Join(b.errs...) } diff --git a/client/sns/integration_test.go b/client/sns/integration_test.go index 553a5752d..802269259 100644 --- a/client/sns/integration_test.go +++ b/client/sns/integration_test.go @@ -22,26 +22,17 @@ func TestNewFromConfig(t *testing.T) { awsRegion := "eu-west-1" - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == sns.ServiceID && region == awsRegion { - return aws.Endpoint{ - URL: "http://localhost:4566", - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(awsRegion), - config.WithEndpointResolverWithOptions(customResolver), config.WithCredentialsProvider(aws.NewCredentialsCache( credentials.NewStaticCredentialsProvider("test", "test", "token"))), ) require.NoError(t, err) - client := NewFromConfig(cfg) + client := NewFromConfig(cfg, func(o *sns.Options) { + o.Region = awsRegion + o.BaseEndpoint = aws.String("http://localhost:4566") + }) // Add your assertions here to test the behavior of the client diff --git a/client/sns/sns.go b/client/sns/sns.go index 41cf1bc26..94443c474 100644 --- a/client/sns/sns.go +++ b/client/sns/sns.go @@ -7,7 +7,7 @@ import ( ) // NewFromConfig creates a new SNS client from aws.Config with OpenTelemetry instrumentation enabled. -func NewFromConfig(cfg aws.Config) *sns.Client { +func NewFromConfig(cfg aws.Config, optFns ...func(*sns.Options)) *sns.Client { otelaws.AppendMiddlewares(&cfg.APIOptions) - return sns.NewFromConfig(cfg) + return sns.NewFromConfig(cfg, optFns...) } diff --git a/client/sqs/integration_test.go b/client/sqs/integration_test.go index cf4285a15..411665166 100644 --- a/client/sqs/integration_test.go +++ b/client/sqs/integration_test.go @@ -22,26 +22,17 @@ func TestNewFromConfig(t *testing.T) { awsRegion := "eu-west-1" - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == sqs.ServiceID && region == awsRegion { - return aws.Endpoint{ - URL: "http://localhost:4566", - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - cfg, err := config.LoadDefaultConfig(context.Background(), config.WithRegion(awsRegion), - config.WithEndpointResolverWithOptions(customResolver), config.WithCredentialsProvider(aws.NewCredentialsCache( credentials.NewStaticCredentialsProvider("test", "test", "token"))), ) require.NoError(t, err) - client := NewFromConfig(cfg) + client := NewFromConfig(cfg, func(o *sqs.Options) { + o.BaseEndpoint = aws.String("http://localhost:4566") + o.Region = awsRegion + }) // Add your assertions here to test the behavior of the client diff --git a/client/sqs/sqs.go b/client/sqs/sqs.go index f4cbeb071..38b9b9917 100644 --- a/client/sqs/sqs.go +++ b/client/sqs/sqs.go @@ -7,7 +7,7 @@ import ( ) // NewFromConfig creates a new SQS client from aws.Config with OpenTelemetry instrumentation enabled. -func NewFromConfig(cfg aws.Config) *sqs.Client { +func NewFromConfig(cfg aws.Config, optFns ...func(*sqs.Options)) *sqs.Client { otelaws.AppendMiddlewares(&cfg.APIOptions) - return sqs.NewFromConfig(cfg) + return sqs.NewFromConfig(cfg, optFns...) } diff --git a/component/amqp/component.go b/component/amqp/component.go index 75249745b..e0b577a0d 100644 --- a/component/amqp/component.go +++ b/component/amqp/component.go @@ -280,7 +280,7 @@ func (c *Component) processAndResetBatch(ctx context.Context, btc *batch) { } func (c *Component) stats(ctx context.Context, sub subscription) error { - q, err := sub.channel.QueueInspect(c.queueCfg.queue) + q, err := sub.channel.QueueDeclarePassive(c.queueCfg.queue, false, false, false, false, nil) if err != nil { return err } diff --git a/component/grpc/component_test.go b/component/grpc/component_test.go index 1bfb3f03f..096ce5d09 100644 --- a/component/grpc/component_test.go +++ b/component/grpc/component_test.go @@ -28,7 +28,7 @@ var ( ) func TestMain(m *testing.M) { - os.Setenv("OTEL_BSP_SCHEDULE_DELAY", "100") + _ = os.Setenv("OTEL_BSP_SCHEDULE_DELAY", "100") tracePublisher = patrontrace.Setup("test", nil, traceExporter) @@ -77,8 +77,7 @@ func TestComponent_Run_Unary(t *testing.T) { assert.NoError(t, cmp.Run(ctx)) chDone <- struct{}{} }() - conn, err := grpc.DialContext(ctx, "localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) + conn, err := grpc.NewClient("localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) c := examples.NewGreeterClient(conn) @@ -162,8 +161,8 @@ func TestComponent_Run_Stream(t *testing.T) { assert.NoError(t, cmp.Run(ctx)) chDone <- struct{}{} }() - conn, err := grpc.DialContext(ctx, "localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithBlock()) + + conn, err := grpc.NewClient("localhost:60000", grpc.WithTransportCredentials(insecure.NewCredentials())) require.NoError(t, err) c := examples.NewGreeterClient(conn) diff --git a/component/kafka/component.go b/component/kafka/component.go index 5e795a452..5c19a26d3 100644 --- a/component/kafka/component.go +++ b/component/kafka/component.go @@ -20,7 +20,6 @@ import ( const ( consumerComponent = "kafka-consumer" - subsystem = "kafka" ) const ( diff --git a/component/sqs/component.go b/component/sqs/component.go index 5ba046559..a9b4db905 100644 --- a/component/sqs/component.go +++ b/component/sqs/component.go @@ -183,7 +183,7 @@ func (c *Component) consume(ctx context.Context, chErr chan error) { MaxNumberOfMessages: c.cfg.maxMessages, WaitTimeSeconds: c.cfg.pollWaitSeconds, VisibilityTimeout: c.cfg.visibilityTimeout, - AttributeNames: []types.QueueAttributeName{ + MessageSystemAttributeNames: []types.MessageSystemAttributeName{ sqsAttributeSentTimestamp, }, MessageAttributeNames: []string{ diff --git a/component/sqs/integration_test.go b/component/sqs/integration_test.go index 1a56cce03..ef883560f 100644 --- a/component/sqs/integration_test.go +++ b/component/sqs/integration_test.go @@ -143,36 +143,18 @@ type SQSAPI interface { } func createSQSAPI(region, endpoint string) (*sqs.Client, error) { - cfg, err := createConfig(sqs.ServiceID, region, endpoint) - if err != nil { - return nil, err - } - - api := sqs.NewFromConfig(cfg) - - return api, nil -} - -func createConfig(awsServiceID, awsRegion, awsEndpoint string) (aws.Config, error) { - customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, _ ...interface{}) (aws.Endpoint, error) { - if service == awsServiceID && region == awsRegion { - return aws.Endpoint{ - URL: awsEndpoint, - SigningRegion: awsRegion, - }, nil - } - // returning EndpointNotFoundError will allow the service to fallback to it's default resolution - return aws.Endpoint{}, &aws.EndpointNotFoundError{} - }) - - cfg, err := awsConfig.LoadDefaultConfig(context.TODO(), - awsConfig.WithRegion(awsRegion), - awsConfig.WithEndpointResolverWithOptions(customResolver), + cfg, err := awsConfig.LoadDefaultConfig(context.Background(), + awsConfig.WithRegion(region), awsConfig.WithCredentialsProvider(aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider("test", "test", ""))), ) if err != nil { - return aws.Config{}, fmt.Errorf("failed to create AWS config: %w", err) + return nil, fmt.Errorf("failed to create AWS config: %w", err) } - return cfg, nil + api := sqs.NewFromConfig(cfg, func(o *sqs.Options) { + o.BaseEndpoint = aws.String(endpoint) + o.Region = region + }) + + return api, nil } diff --git a/component/sqs/metric.go b/component/sqs/metric.go index b284a8efa..20e212c2a 100644 --- a/component/sqs/metric.go +++ b/component/sqs/metric.go @@ -31,11 +31,11 @@ func init() { } func observerMessageAge(ctx context.Context, queue string, attrs map[string]string) { - attribute, ok := attrs[sqsAttributeSentTimestamp] - if !ok || len(strings.TrimSpace(attribute)) == 0 { + attr, ok := attrs[sqsAttributeSentTimestamp] + if !ok || len(strings.TrimSpace(attr)) == 0 { return } - timestamp, err := strconv.ParseInt(attribute, 10, 64) + timestamp, err := strconv.ParseInt(attr, 10, 64) if err != nil { return } diff --git a/examples/client/main.go b/examples/client/main.go index 391e6ec98..2176e1164 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -131,7 +131,7 @@ func sendHTTPRequest(ctx context.Context) error { } func sendGRPCRequest(ctx context.Context) error { - cc, err := patrongrpc.DialContext(ctx, examples.GRPCTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := patrongrpc.NewClient(examples.GRPCTarget, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return err } @@ -157,7 +157,12 @@ func sendKafkaMessage(ctx context.Context) error { if err != nil { return err } - defer producer.Close() + defer func() { + err := producer.Close() + if err != nil { + fmt.Println(err) + } + }() msg := &sarama.ProducerMessage{ Topic: examples.KafkaTopic, diff --git a/examples/examples.go b/examples/examples.go index a6eed8c43..5b535589c 100644 --- a/examples/examples.go +++ b/examples/examples.go @@ -1,7 +1,7 @@ package examples import ( - context "context" + "context" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config"