Skip to content

Commit

Permalink
feat(all): add gogoproto codec to grpc servers
Browse files Browse the repository at this point in the history
This PR provides a new function to create a gRPC server. It helps the gRPC server use gogoproto
codec.
  • Loading branch information
ijsong committed Sep 25, 2023
1 parent 844d6e8 commit 584d38c
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 8 deletions.
3 changes: 2 additions & 1 deletion internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/kakao/varlog/internal/admin/sfgkey"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/rpc/interceptors/logging"
"github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc"
"github.com/kakao/varlog/pkg/types"
Expand Down Expand Up @@ -74,7 +75,7 @@ func New(ctx context.Context, opts ...Option) (*Admin, error) {
return nil, err
}

grpcServer := grpc.NewServer(
grpcServer := rpc.NewServer(
grpc.ChainUnaryInterceptor(
logging.UnaryServerInterceptor(cfg.logger),
otelgrpc.UnaryServerInterceptor(telemetry.GetGlobalMeterProvider()),
Expand Down
3 changes: 2 additions & 1 deletion internal/admin/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/proto/admpb"
)

Expand Down Expand Up @@ -67,7 +68,7 @@ func TestNewMockServer(t *testing.T, ctrl *gomock.Controller) *TestMockServer {

tms := &TestMockServer{
listener: lis,
grpcServer: grpc.NewServer(),
grpcServer: rpc.NewServer(),
address: addr,
MockClusterManagerServer: admpb.NewMockClusterManagerServer(ctrl),
}
Expand Down
3 changes: 2 additions & 1 deletion internal/metarepos/raft_metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/kakao/varlog/internal/reportcommitter"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/container/set"
"github.com/kakao/varlog/pkg/util/netutil"
Expand Down Expand Up @@ -138,7 +139,7 @@ func NewRaftMetadataRepository(opts ...Option) *RaftMetadataRepository {

mr.reportCollector = NewReportCollector(mr, mr.rpcTimeout, mr.tmStub, mr.logger.Named("report"))

mr.server = grpc.NewServer()
mr.server = rpc.NewServer()
mr.healthServer = health.NewServer()
grpc_health_v1.RegisterHealthServer(mr.server, mr.healthServer)
NewMetadataRepositoryService(mr).Register(mr.server)
Expand Down
2 changes: 1 addition & 1 deletion internal/storagenode/logstream/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (trs *testReplicateServer) SyncReplicateStream(snpb.Replicator_SyncReplicat

func TestNewReplicateServer(t *testing.T, mock *mock.MockReplicatorServer) (server snpb.ReplicatorServer, rpcConn *rpc.Conn, closer func()) {
trs := &testReplicateServer{
server: grpc.NewServer(),
server: rpc.NewServer(),
mock: mock,
}

Expand Down
3 changes: 2 additions & 1 deletion internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kakao/varlog/internal/storagenode/pprof"
"github.com/kakao/varlog/internal/storagenode/telemetry"
"github.com/kakao/varlog/internal/storagenode/volume"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/rpc/interceptors/logging"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/fputil"
Expand Down Expand Up @@ -118,7 +119,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) {
sn := &StorageNode{
config: cfg,
executors: executorsmap.New(hintNumExecutors),
server: grpc.NewServer(grpcServerOpts...),
server: rpc.NewServer(grpcServerOpts...),
healthServer: health.NewServer(),
closedC: make(chan struct{}),
snPaths: snPaths,
Expand Down
3 changes: 2 additions & 1 deletion internal/storagenode/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"google.golang.org/grpc"

"github.com/kakao/varlog/internal/storagenode/client"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/snpb"
"github.com/kakao/varlog/proto/snpb/mock"
Expand Down Expand Up @@ -222,7 +223,7 @@ func TestNewRPCServer(t *testing.T, ctrl *gomock.Controller, snid types.StorageN

trs := &testRPCServer{
listener: lis,
grpcServer: grpc.NewServer(),
grpcServer: rpc.NewServer(),
address: addr,
snid: snid,
MockLogIOServer: mock.NewMockLogIOServer(ctrl),
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func testNewServer(t *testing.T) (addr string, closer func()) {
assert.NoError(t, err)
addr = lis.Addr().String()

server := grpc.NewServer()
server := NewServer()

var wg sync.WaitGroup
wg.Add(1)
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package rpc

import "google.golang.org/grpc"

// NewServer calls grpc.NewServer function. The package
// github.com/kakao/varlog/pkg/rpc registers the gogoproto codec to the gRPC.
// Therefore calling this method rather than grpc.NewServer makes the
// application server use the gogoproto codec instead of the regular proto
// codec.
func NewServer(opts ...grpc.ServerOption) *grpc.Server {
return grpc.NewServer(opts...)
}
3 changes: 2 additions & 1 deletion pkg/util/testutil/conveyutil/conveyutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/util/netutil"
)

Expand All @@ -22,7 +23,7 @@ func WithServiceServer(s service, testf func(server *grpc.Server, addr string))
convey.So(err, convey.ShouldBeNil)
addr := addrs[0]

server := grpc.NewServer()
server := rpc.NewServer()
s.Register(server)

go func() {
Expand Down

0 comments on commit 584d38c

Please sign in to comment.