From 584d38cf7f625c11221992e21f0d4aa6c29a0577 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Fri, 1 Sep 2023 16:59:52 +0900 Subject: [PATCH] feat(all): add gogoproto codec to grpc servers This PR provides a new function to create a gRPC server. It helps the gRPC server use gogoproto codec. --- internal/admin/admin.go | 3 ++- internal/admin/testing.go | 3 ++- internal/metarepos/raft_metadata_repository.go | 3 ++- internal/storagenode/logstream/testing.go | 2 +- internal/storagenode/storagenode.go | 3 ++- internal/storagenode/testing.go | 3 ++- pkg/rpc/manager_test.go | 2 +- pkg/rpc/server.go | 12 ++++++++++++ pkg/util/testutil/conveyutil/conveyutil.go | 3 ++- 9 files changed, 26 insertions(+), 8 deletions(-) create mode 100644 pkg/rpc/server.go diff --git a/internal/admin/admin.go b/internal/admin/admin.go index c0a74d6c2..1cfd27375 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -20,6 +20,7 @@ import ( "github.com/kakao/varlog/internal/admin/sfgkey" "github.com/kakao/varlog/internal/admin/snwatcher" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/rpc/interceptors/logging" "github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc" "github.com/kakao/varlog/pkg/types" @@ -74,7 +75,7 @@ func New(ctx context.Context, opts ...Option) (*Admin, error) { return nil, err } - grpcServer := grpc.NewServer( + grpcServer := rpc.NewServer( grpc.ChainUnaryInterceptor( logging.UnaryServerInterceptor(cfg.logger), otelgrpc.UnaryServerInterceptor(telemetry.GetGlobalMeterProvider()), diff --git a/internal/admin/testing.go b/internal/admin/testing.go index 8cc531f59..1856b619d 100644 --- a/internal/admin/testing.go +++ b/internal/admin/testing.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/require" "google.golang.org/grpc" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/proto/admpb" ) @@ -67,7 +68,7 @@ func TestNewMockServer(t *testing.T, ctrl *gomock.Controller) *TestMockServer { tms := &TestMockServer{ listener: lis, - grpcServer: grpc.NewServer(), + grpcServer: rpc.NewServer(), address: addr, MockClusterManagerServer: admpb.NewMockClusterManagerServer(ctrl), } diff --git a/internal/metarepos/raft_metadata_repository.go b/internal/metarepos/raft_metadata_repository.go index 0775fa9bd..5a1c6c208 100644 --- a/internal/metarepos/raft_metadata_repository.go +++ b/internal/metarepos/raft_metadata_repository.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/kakao/varlog/internal/reportcommitter" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/container/set" "github.com/kakao/varlog/pkg/util/netutil" @@ -138,7 +139,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository { mr.reportCollector = NewReportCollector(mr, mr.rpcTimeout, mr.tmStub, mr.logger.Named("report")) - mr.server = grpc.NewServer() + mr.server = rpc.NewServer() mr.healthServer = health.NewServer() grpc_health_v1.RegisterHealthServer(mr.server, mr.healthServer) NewMetadataRepositoryService(mr).Register(mr.server) diff --git a/internal/storagenode/logstream/testing.go b/internal/storagenode/logstream/testing.go index 51119c430..1bcf4092d 100644 --- a/internal/storagenode/logstream/testing.go +++ b/internal/storagenode/logstream/testing.go @@ -43,7 +43,7 @@ func (trs *testReplicateServer) SyncReplicateStream(snpb.Replicator_SyncReplicat func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func()) { trs := &testReplicateServer{ - server: grpc.NewServer(), + server: rpc.NewServer(), mock: mock, } diff --git a/internal/storagenode/storagenode.go b/internal/storagenode/storagenode.go index 1eed68edf..54d954dc6 100644 --- a/internal/storagenode/storagenode.go +++ b/internal/storagenode/storagenode.go @@ -30,6 +30,7 @@ import ( "github.com/kakao/varlog/internal/storagenode/pprof" "github.com/kakao/varlog/internal/storagenode/telemetry" "github.com/kakao/varlog/internal/storagenode/volume" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/rpc/interceptors/logging" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/fputil" @@ -118,7 +119,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) { sn := &StorageNode{ config: cfg, executors: executorsmap.New(hintNumExecutors), - server: grpc.NewServer(grpcServerOpts...), + server: rpc.NewServer(grpcServerOpts...), healthServer: health.NewServer(), closedC: make(chan struct{}), snPaths: snPaths, diff --git a/internal/storagenode/testing.go b/internal/storagenode/testing.go index 2187cf7e7..3ff2c1961 100644 --- a/internal/storagenode/testing.go +++ b/internal/storagenode/testing.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc" "github.com/kakao/varlog/internal/storagenode/client" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/snpb" "github.com/kakao/varlog/proto/snpb/mock" @@ -222,7 +223,7 @@ func TestNewRPCServer(t *testing.T, ctrl *gomock.Controller, snid types.StorageN trs := &testRPCServer{ listener: lis, - grpcServer: grpc.NewServer(), + grpcServer: rpc.NewServer(), address: addr, snid: snid, MockLogIOServer: mock.NewMockLogIOServer(ctrl), diff --git a/pkg/rpc/manager_test.go b/pkg/rpc/manager_test.go index ad81b64df..2ade0781e 100644 --- a/pkg/rpc/manager_test.go +++ b/pkg/rpc/manager_test.go @@ -20,7 +20,7 @@ func testNewServer(t *testing.T) (addr string, closer func()) { assert.NoError(t, err) addr = lis.Addr().String() - server := grpc.NewServer() + server := NewServer() var wg sync.WaitGroup wg.Add(1) diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go new file mode 100644 index 000000000..1432895d4 --- /dev/null +++ b/pkg/rpc/server.go @@ -0,0 +1,12 @@ +package rpc + +import "google.golang.org/grpc" + +// NewServer calls grpc.NewServer function. The package +// github.com/kakao/varlog/pkg/rpc registers the gogoproto codec to the gRPC. +// Therefore calling this method rather than grpc.NewServer makes the +// application server use the gogoproto codec instead of the regular proto +// codec. +func NewServer(opts ...grpc.ServerOption) *grpc.Server { + return grpc.NewServer(opts...) +} diff --git a/pkg/util/testutil/conveyutil/conveyutil.go b/pkg/util/testutil/conveyutil/conveyutil.go index 152518543..d4b624b75 100644 --- a/pkg/util/testutil/conveyutil/conveyutil.go +++ b/pkg/util/testutil/conveyutil/conveyutil.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/kakao/varlog/pkg/rpc" "github.com/kakao/varlog/pkg/util/netutil" ) @@ -22,7 +23,7 @@ func WithServiceServer(s service, testf func(server *grpc.Server, addr string)) convey.So(err, convey.ShouldBeNil) addr := addrs[0] - server := grpc.NewServer() + server := rpc.NewServer() s.Register(server) go func() {