Skip to content

Commit

Permalink
Fixing deprecated (#773)
Browse files Browse the repository at this point in the history
<!--
Thanks for taking precious time for making a PR.

Before creating a pull request, please make sure:
- Your PR solves one problem for which an issue exist and a solution has
been discussed
- You have read the guide for contributing
- See
https://github.com/beatlabs/patron/blob/master/README.md#how-to-contribute
- You signed all your commits (otherwise we won't be able to merge the
PR)
  - See https://github.com/beatlabs/patron/blob/master/SIGNYOURWORK.md
- You added unit tests for the new functionality
- You mention in the PR description which issue it is addressing, e.g.
"Resolves #123"
-->

## Which problem is this PR solving?

<!-- REQUIRED -->

## Short description of the changes

<!-- REQUIRED -->
  • Loading branch information
mantzas authored Oct 27, 2024
1 parent f82d5c7 commit 15f14aa
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 230 deletions.
9 changes: 1 addition & 8 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ linters:
- makezero
- nilnil
- reassign
- sloglint
- spancheck
- testifylint
- wastedassign
Expand All @@ -74,16 +73,10 @@ linters:
- dogsled
- protogetter
- usestdlibvars
- testableexamples
- testableexamples
fast: false

issues:
exclude-dirs:
- vendor
max-same-issues: 10

exclude-rules:
# Exclude some staticcheck messages
- linters:
- staticcheck
text: "SA1019:"
12 changes: 2 additions & 10 deletions client/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,18 @@
package grpc

import (
"context"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)

// Dial creates a client connection to the given target with a tracing and
// metrics unary interceptor.
func Dial(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return DialContext(context.Background(), target, opts...)
}

// DialContext creates a client connection to the given target with a context and
// a tracing and metrics unary interceptor.
func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
func NewClient(target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
if len(opts) == 0 {
opts = make([]grpc.DialOption, 0)
}

opts = append(opts, grpc.WithStatsHandler(otelgrpc.NewClientHandler()))

return grpc.DialContext(ctx, target, opts...)
return grpc.NewClient(target, opts...)
}
144 changes: 4 additions & 140 deletions client/grpc/grpc_test.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,19 @@
package grpc

import (
"context"
"fmt"
"log"
"net"
"os"
"testing"

"github.com/beatlabs/patron/examples"
"github.com/beatlabs/patron/internal/test"
"github.com/beatlabs/patron/observability/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
"google.golang.org/grpc/test/bufconn"
)

const (
bufSize = 1024 * 1024
target = "bufnet"
target = "target"
)

var lis *bufconn.Listener

type server struct {
examples.UnimplementedGreeterServer
}

func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error {
return status.Error(codes.Unavailable, "streaming not supported")
}

func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) {
if req.GetFirstname() == "" {
return nil, status.Error(codes.InvalidArgument, "first name cannot be empty")
}
return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil
}

func TestMain(m *testing.M) {
lis = bufconn.Listen(bufSize)
s := grpc.NewServer()
examples.RegisterGreeterServer(s, &server{})
go func() {
if err := s.Serve(lis); err != nil {
log.Fatal(err)
}
}()

code := m.Run()

s.GracefulStop()

os.Exit(code)
}

func bufDialer(_ context.Context, _ string) (net.Conn, error) {
return lis.Dial()
}

func TestDial(t *testing.T) {
conn, err := Dial(target, grpc.WithContextDialer(bufDialer), grpc.WithInsecure())
require.NoError(t, err)
assert.NotNil(t, conn)
require.NoError(t, conn.Close())
}

func TestDialContext(t *testing.T) {
func TestNewClient(t *testing.T) {
t.Parallel()
type args struct {
opts []grpc.DialOption
Expand All @@ -83,7 +24,7 @@ func TestDialContext(t *testing.T) {
}{
"success": {
args: args{
opts: []grpc.DialOption{grpc.WithContextDialer(bufDialer), grpc.WithInsecure()},
opts: []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())},
},
},
"failure missing grpc.WithInsecure()": {
Expand All @@ -94,7 +35,7 @@ func TestDialContext(t *testing.T) {
for name, tt := range tests {
t.Run(name, func(t *testing.T) {
t.Parallel()
gotConn, err := DialContext(context.Background(), target, tt.args.opts...)
gotConn, err := NewClient(target, tt.args.opts...)
if tt.expectedErr != "" {
require.EqualError(t, err, tt.expectedErr)
assert.Nil(t, gotConn)
Expand All @@ -105,80 +46,3 @@ func TestDialContext(t *testing.T) {
})
}
}

func TestSayHello(t *testing.T) {
ctx := context.Background()
conn, err := DialContext(ctx, target, grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer func() {
require.NoError(t, conn.Close())
}()

// Tracing setup
exp := tracetest.NewInMemoryExporter()
tracePublisher := trace.Setup("test", nil, exp)

// Metrics monitoring set up
shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t)
defer shutdownProvider()

client := examples.NewGreeterClient(conn)

tt := map[string]struct {
req *examples.HelloRequest
wantErr bool
wantCode codes.Code
wantMsg string
wantCounter int
}{
"ok": {
req: &examples.HelloRequest{Firstname: "John"},
wantErr: false,
wantCode: codes.OK,
wantMsg: "Hello John!",
wantCounter: 1,
},
"invalid": {
req: &examples.HelloRequest{},
wantErr: true,
wantCode: codes.InvalidArgument,
wantMsg: "first name cannot be empty",
wantCounter: 1,
},
}

for n, tt := range tt {
t.Run(n, func(t *testing.T) {
t.Cleanup(func() { exp.Reset() })

res, err := client.SayHello(ctx, tt.req)
if tt.wantErr {
require.Nil(t, res)
require.Error(t, err)

rpcStatus, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, tt.wantCode, rpcStatus.Code())
require.Equal(t, tt.wantMsg, rpcStatus.Message())
} else {
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, tt.wantMsg, res.GetMessage())
}

require.NoError(t, tracePublisher.ForceFlush(context.Background()))

snaps := exp.GetSpans().Snapshots()

assert.Len(t, snaps, 1)
assert.Equal(t, "examples.Greeter/SayHello", snaps[0].Name())
assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), snaps[0].Attributes()[0])
assert.Equal(t, attribute.String("rpc.method", "SayHello"), snaps[0].Attributes()[1])
assert.Equal(t, attribute.String("rpc.system", "grpc"), snaps[0].Attributes()[2])
assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), snaps[0].Attributes()[3])

// Metrics
_ = assertCollectMetrics(4)
})
}
}
146 changes: 146 additions & 0 deletions client/grpc/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
//go:build integration

package grpc

import (
"context"
"fmt"
"log"
"net"
"testing"

"github.com/beatlabs/patron/examples"
"github.com/beatlabs/patron/internal/test"
"github.com/beatlabs/patron/observability/trace"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

type server struct {
examples.UnimplementedGreeterServer
}

func (s *server) SayHelloStream(_ *examples.HelloRequest, _ examples.Greeter_SayHelloStreamServer) error {
return status.Error(codes.Unavailable, "streaming not supported")
}

func (s *server) SayHello(_ context.Context, req *examples.HelloRequest) (*examples.HelloReply, error) {
if req.GetFirstname() == "" {
return nil, status.Error(codes.InvalidArgument, "first name cannot be empty")
}
return &examples.HelloReply{Message: fmt.Sprintf("Hello %s!", req.GetFirstname())}, nil
}

func TestSayHello(t *testing.T) {
ctx := context.Background()

client, closer, err := testServer()
require.NoError(t, err)
defer closer()

// Tracing setup
exp := tracetest.NewInMemoryExporter()
tracePublisher := trace.Setup("test", nil, exp)

// Metrics monitoring set up
shutdownProvider, assertCollectMetrics := test.SetupMetrics(ctx, t)
defer shutdownProvider()

tt := map[string]struct {
req *examples.HelloRequest
wantErr bool
wantCode codes.Code
wantMsg string
wantCounter int
}{
"ok": {
req: &examples.HelloRequest{Firstname: "John"},
wantErr: false,
wantCode: codes.OK,
wantMsg: "Hello John!",
wantCounter: 1,
},
"invalid": {
req: &examples.HelloRequest{},
wantErr: true,
wantCode: codes.InvalidArgument,
wantMsg: "first name cannot be empty",
wantCounter: 1,
},
}

for n, tt := range tt {
t.Run(n, func(t *testing.T) {
t.Cleanup(func() { exp.Reset() })

res, err := client.SayHello(ctx, tt.req)
if tt.wantErr {
require.Nil(t, res)
require.Error(t, err)

rpcStatus, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, tt.wantCode, rpcStatus.Code())
require.Equal(t, tt.wantMsg, rpcStatus.Message())
} else {
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, tt.wantMsg, res.GetMessage())
}

require.NoError(t, tracePublisher.ForceFlush(context.Background()))

spans := exp.GetSpans().Snapshots()

assert.Len(t, spans, 1)
assert.Equal(t, "examples.Greeter/SayHello", spans[0].Name())
assert.Equal(t, attribute.String("rpc.service", "examples.Greeter"), spans[0].Attributes()[0])
assert.Equal(t, attribute.String("rpc.method", "SayHello"), spans[0].Attributes()[1])
assert.Equal(t, attribute.String("rpc.system", "grpc"), spans[0].Attributes()[2])
assert.Equal(t, attribute.String("net.sock.peer.addr", "127.0.0.1"), spans[0].Attributes()[3])
// assert.Equal(t, attribute.Int64("net.sock.peer.port", 0), spans[0].Attributes()[4])
assert.Equal(t, attribute.Int64("rpc.grpc.status_code", int64(tt.wantCode)), spans[0].Attributes()[5])

// Metrics
_ = assertCollectMetrics(4)
})
}
}

func testServer() (examples.GreeterClient, func(), error) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, nil, err
}

baseServer := grpc.NewServer()
examples.RegisterGreeterServer(baseServer, &server{})
go func() {
if err := baseServer.Serve(lis); err != nil {
log.Printf("error serving server: %v", err)
}
}()

closer := func() {
err := lis.Close()
if err != nil {
log.Printf("error closing listener: %v", err)
}
baseServer.Stop()
}

conn, err := NewClient(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
defer closer()
return nil, nil, err
}

client := examples.NewGreeterClient(conn)
return client, closer, nil
}
2 changes: 1 addition & 1 deletion client/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (b *Builder) Create() (*SyncProducer, error) {
}

// CreateAsync a new asynchronous producer.
func (b Builder) CreateAsync() (*AsyncProducer, <-chan error, error) {
func (b *Builder) CreateAsync() (*AsyncProducer, <-chan error, error) {
if len(b.errs) > 0 {
return nil, nil, errors.Join(b.errs...)
}
Expand Down
Loading

0 comments on commit 15f14aa

Please sign in to comment.