Skip to content

Commit

Permalink
refactor:重构grpc-go-polaris的相关版本设计
Browse files Browse the repository at this point in the history
  • Loading branch information
chuntaojun committed Jun 20, 2024
1 parent 8fc96c7 commit 1a6227d
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 21 deletions.
21 changes: 11 additions & 10 deletions examples/quickstart/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"net"
"time"

"google.golang.org/grpc"

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
)
Expand All @@ -45,23 +43,26 @@ func (h *EchoQuickStartService) Echo(ctx context.Context, req *pb.EchoRequest) (
}

func main() {
srv := grpc.NewServer()
address := fmt.Sprintf("0.0.0.0:%d", listenPort)
listen, err := net.Listen("tcp", address)
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}
pb.RegisterEchoServerServer(srv, &EchoQuickStartService{

srv, err := polaris.NewServer(polaris.WithServiceName("QuickStartEchoServerGRPC"),
polaris.WithServerHost("127.0.0.1"),
polaris.WithDelayRegisterEnable(&polaris.WaitDelayStrategy{WaitTime: 10 * time.Second}),
polaris.WithGracefulStopEnable(10*time.Second))
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}

pb.RegisterEchoServerServer(srv.Server, &EchoQuickStartService{
actualPort: listen.Addr().(*net.TCPAddr).Port,
})

// 启动服务
if err := polaris.Serve(srv, listen,
polaris.WithServiceName("QuickStartEchoServerGRPC"),
polaris.WithServerHost("127.0.0.1"),
polaris.WithDelayRegisterEnable(&polaris.WaitDelayStrategy{WaitTime: 10 * time.Second}),
polaris.WithGracefulStopEnable(10*time.Second),
); nil != err {
if err := srv.Serve(listen); nil != err {
log.Printf("listen err: %v", err)
}
}
23 changes: 13 additions & 10 deletions examples/ratelimit/provider/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
"log"
"net"

"google.golang.org/grpc"

polaris "github.com/polarismesh/grpc-go-polaris"
"github.com/polarismesh/grpc-go-polaris/examples/common/pb"
)
Expand All @@ -48,15 +46,20 @@ func main() {
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}
listenAddr := listen.Addr().String()
fmt.Printf("listen address is %s\n", listenAddr)
interceptor := polaris.NewRateLimitInterceptor().WithServiceName("RateLimitEchoServerGRPC")
srv := grpc.NewServer(grpc.UnaryInterceptor(interceptor.UnaryInterceptor))
pb.RegisterEchoServerServer(srv, &EchoRateLimitService{})

srv, err := polaris.NewServer(polaris.WithServiceName("RateLimitEchoServerGRPC"),
polaris.WithServerHost("127.0.0.1"),
// 开启限流能力
polaris.WithPolarisRateLimit(),
)
if err != nil {
log.Fatalf("Failed to addr %s: %v", address, err)
}

pb.RegisterEchoServerServer(srv.Server, &EchoRateLimitService{})

// 启动服务
if err := polaris.Serve(srv, listen,
polaris.WithServiceName("RateLimitEchoServerGRPC"),
); nil != err {
if err := srv.Serve(listen); nil != err {
log.Printf("listen err: %v", err)
}
}
5 changes: 5 additions & 0 deletions ratelimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ func NewRateLimitInterceptor() *RateLimitInterceptor {
return &RateLimitInterceptor{limitAPI: api.NewLimitAPIByContext(polarisCtx)}
}

// NewRateLimitInterceptor creates a new RateLimitInterceptor.
func newRateLimitInterceptor(sdkCtx api.SDKContext) *RateLimitInterceptor {
return &RateLimitInterceptor{limitAPI: api.NewLimitAPIByContext(sdkCtx)}
}

// WithNamespace sets the namespace of the service.
func (p *RateLimitInterceptor) WithNamespace(namespace string) *RateLimitInterceptor {
p.namespace = namespace
Expand Down
189 changes: 188 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,200 @@ import (
"google.golang.org/protobuf/proto"
)

// RegisterContext context parameters by register
type RegisterContext struct {
providerAPI api.ProviderAPI
registerRequests []*api.InstanceRegisterRequest
}

// Server encapsulated server with gRPC option
type Server struct {
gServer *grpc.Server
*grpc.Server
serverOptions serverOptions
sdkCtx api.SDKContext
registerContext *RegisterContext
}

// NewServer start polaris server
func NewServer(opts ...ServerOption) (*Server, error) {
srv := &Server{}
if err := srv.initResource(opts...); err != nil {
return nil, err
}

if srv.serverOptions.enableRatelimit != nil && *srv.serverOptions.enableRatelimit {
// 添加北极星限流的 gRPC Interceptor
srv.serverOptions.gRPCServerOptions = append(srv.serverOptions.gRPCServerOptions,
grpc.ChainUnaryInterceptor(newRateLimitInterceptor(srv.sdkCtx).
WithNamespace(srv.serverOptions.namespace).
WithServiceName(srv.serverOptions.svcName).
UnaryInterceptor))
}

gSrv := grpc.NewServer(srv.serverOptions.gRPCServerOptions...)
srv.Server = gSrv
return srv, nil
}

func (srv *Server) initResource(opts ...ServerOption) error {
srv.serverOptions = serverOptions{}

for _, opt := range opts {
opt.apply(&srv.serverOptions)
}
srv.serverOptions.setDefault()
registerContext := &RegisterContext{}
polarisCtx, err := api.InitContextByConfig(srv.serverOptions.config)
if nil != err {
return err
}

if *srv.serverOptions.delayRegisterEnable {
delayStrategy := srv.serverOptions.delayRegisterStrategy
for {
if delayStrategy.Allow() {
break
}
time.Sleep(100 * time.Millisecond)
}
}

registerContext.providerAPI = api.NewProviderAPIByContext(polarisCtx)

srv.sdkCtx = polarisCtx
srv.registerContext = registerContext
return nil
}

func (srv *Server) doRegister(lis net.Listener) error {
if len(srv.serverOptions.host) == 0 {
host, err := getLocalHost(srv.sdkCtx.GetConfig().GetGlobal().GetServerConnector().GetAddresses()[0])
if nil != err {
return fmt.Errorf("error occur while fetching localhost: %w", err)
}
srv.serverOptions.host = host
}
port, err := parsePort(lis.Addr().String())
if nil != err {
return fmt.Errorf("error occur while parsing port from listener: %w", err)
}
srv.serverOptions.port = port
svcInfos := buildServiceNames(srv.Server, srv)

for _, name := range svcInfos {
registerRequest := buildRegisterInstanceRequest(srv, name)
srv.registerContext.registerRequests = append(srv.registerContext.registerRequests, registerRequest)
resp, err := srv.registerContext.providerAPI.RegisterInstance(registerRequest)
if nil != err {
deregisterServices(srv.registerContext)
return fmt.Errorf("fail to register service %s: %w", name, err)
}
grpclog.Infof("[Polaris][Naming] success to register %s:%d to service %s(%s), id %s",
registerRequest.Host, registerRequest.Port, name, registerRequest.Namespace, resp.InstanceID)
}
return nil
}

// Serve 代理 gRPC Server 的 Serve
func (srv *Server) Serve(lis net.Listener) error {
if err := srv.doRegister(lis); err != nil {
return err
}

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
srv.Stop()
}()

return srv.Server.Serve(lis)
}

// Stop deregister and stop
func (s *Server) Stop() {
s.Deregister()

if !*s.serverOptions.gracefulStopEnable {
s.Server.Stop()
return
}

ctx, cancel := context.WithDeadline(context.Background(),
time.Now().Add(s.serverOptions.gracefulStopMaxWaitDuration))
go func() {
s.Server.GracefulStop()
cancel()
}()

<-ctx.Done()
}

// Deregister deregister services from polaris
func (s *Server) Deregister() {
deregisterServices(s.registerContext)
}

// Serve start polaris server
func Serve(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) error {
pSrv, err := Register(gSrv, lis, opts...)
if err != nil {
log.Fatalf("polaris register err: %v", err)
}

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
s := <-c
log.Printf("[Polaris][Naming] receive quit signal: %v", s)
signal.Stop(c)
pSrv.Stop()
}()

return gSrv.Serve(lis)
}

// Register server as polaris instances
func Register(gSrv *grpc.Server, lis net.Listener, opts ...ServerOption) (*Server, error) {
srv := &Server{Server: gSrv}
if err := srv.initResource(opts...); err != nil {
return nil, err
}
return srv, srv.doRegister(lis)
}

func buildServiceNames(gSrv *grpc.Server, svr *Server) []string {
svcInfo := gSrv.GetServiceInfo()
ret := make([]string, 0, len(svcInfo))
for k := range svcInfo {
ret = append(ret, k)
}

if len(svr.serverOptions.svcName) != 0 {
ret = []string{
svr.serverOptions.svcName,
}
}

return ret
}

func buildRegisterInstanceRequest(srv *Server, serviceName string) *api.InstanceRegisterRequest {
registerRequest := &api.InstanceRegisterRequest{}
registerRequest.Namespace = srv.serverOptions.namespace
registerRequest.Service = serviceName
registerRequest.Host = srv.serverOptions.host
registerRequest.Port = srv.serverOptions.port
registerRequest.Protocol = proto.String("grpc")
registerRequest.Metadata = srv.serverOptions.metadata
registerRequest.Version = proto.String(srv.serverOptions.version)
registerRequest.ServiceToken = srv.serverOptions.token
registerRequest.SetTTL(srv.serverOptions.ttl)
return registerRequest
}

func getLocalHost(serverAddr string) (string, error) {
conn, err := net.Dial("tcp", serverAddr)
if nil != err {
Expand Down
19 changes: 19 additions & 0 deletions server_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type ctrlOptions struct {
delayRegisterStrategy DelayStrategy
gracefulStopEnable *bool
gracefulStopMaxWaitDuration time.Duration
enableRatelimit *bool
}

func (s *serverOptions) setDefault() {
Expand All @@ -68,6 +69,9 @@ func (s *serverOptions) setDefault() {
setGracefulStopMaxWaitDuration(s, DefaultGracefulStopMaxWaitDuration)
}
}
if s.config == nil {
s.config = PolarisConfig()
}
}

// DelayStrategy delay register strategy. e.g. wait some time
Expand Down Expand Up @@ -258,3 +262,18 @@ func WithPort(port int) ServerOption {
options.port = port
})
}

// WithServerPolarisConfig set polaris configuration
func WithServerPolarisConfig(polarisCfg config.Configuration) ServerOption {
return newFuncServerOption(func(options *serverOptions) {
options.config = polarisCfg
})
}

// WithPolarisLimit 开启北极星服务端限流能力
func WithPolarisRateLimit() ServerOption {
return newFuncServerOption(func(options *serverOptions) {
enable := true
options.enableRatelimit = &enable
})
}

0 comments on commit 1a6227d

Please sign in to comment.