From fceaaa199b461c77c0526ece9ab3087bf04a4ffd Mon Sep 17 00:00:00 2001 From: icey-yu <119291641+icey-yu@users.noreply.github.com> Date: Fri, 6 Dec 2024 17:25:42 +0800 Subject: [PATCH] feat: compatible autoSetPorts (#2929) --- config/openim-msggateway.yml | 5 + config/openim-push.yml | 6 ++ config/openim-rpc-auth.yml | 5 + config/openim-rpc-conversation.yml | 5 + config/openim-rpc-friend.yml | 5 + config/openim-rpc-group.yml | 5 + config/openim-rpc-msg.yml | 5 + config/openim-rpc-third.yml | 5 + config/openim-rpc-user.yml | 5 + internal/msggateway/hub_server.go | 3 +- internal/msggateway/init.go | 3 +- internal/tools/addr/addr.go | 165 ----------------------------- pkg/common/cmd/auth.go | 1 + pkg/common/cmd/conversation.go | 1 + pkg/common/cmd/friend.go | 1 + pkg/common/cmd/group.go | 1 + pkg/common/cmd/msg.go | 1 + pkg/common/cmd/push.go | 1 + pkg/common/cmd/third.go | 1 + pkg/common/cmd/user.go | 1 + pkg/common/config/config.go | 60 ++++++----- pkg/common/startrpc/start.go | 108 ++++++++++--------- 22 files changed, 150 insertions(+), 243 deletions(-) delete mode 100644 internal/tools/addr/addr.go diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 84e232ccb4..74eab35d54 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -1,6 +1,11 @@ rpc: # The IP address where this RPC service registers itself; if left blank, it defaults to the internal network IP registerIP: + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10140, 10141, 10142, 10143, 10144, 10145, 10146, 10147, 10148, 10149, 10150, 10151, 10152, 10153, 10154, 10155 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-push.yml b/config/openim-push.yml index 2db09b37e0..53c88ed410 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -3,6 +3,12 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10170, 10171, 10172, 10173, 10174, 10175, 10176, 10177, 10178, 10179, 10180, 10181, 10182, 10183, 10184, 10185 ] + prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 9ebd78070b..230aa8720a 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10200 ] prometheus: diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index 1cb56ac606..ed61f70102 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10220 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index 11b805299c..e10ef496a6 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10240 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index d16ae7b02a..9f5ceababd 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10260 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 5917ff5ae9..72e055a5d5 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10280 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 54123d8ea3..9578e82c65 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -3,6 +3,11 @@ rpc: registerIP: # IP address that the RPC service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10300 ] prometheus: # Enable or disable Prometheus monitoring diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 4bd6444a74..6f7b9648b0 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -3,6 +3,11 @@ rpc: registerIP: # Listening IP; 0.0.0.0 means both internal and external IPs are listened to, if blank, the internal network IP is automatically obtained by default listenIP: 0.0.0.0 + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true + # List of ports that the RPC service listens on; configuring multiple ports will launch multiple instances. These must match the number of configured prometheus ports + # It will only take effect when autoSetPorts is set to false. + ports: [ 10320 ] prometheus: # Whether to enable prometheus diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 04376b27f1..b6760ed0f3 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -46,7 +46,8 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover func (s *Server) Start(ctx context.Context, index int, conf *Config) error { return startrpc.Start(ctx, &conf.Discovery, &conf.MsgGateway.Prometheus, conf.MsgGateway.ListenIP, - index, + conf.MsgGateway.RPC.RegisterIP, + conf.MsgGateway.RPC.AutoSetPorts, conf.MsgGateway.RPC.Ports, index, conf.Share.RpcRegisterName.MessageGateway, &conf.Share, conf, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 77bccc88cf..1796a54822 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -35,7 +35,8 @@ type Config struct { // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - log.CInfo(ctx, "MSG-GATEWAY server is initializing", + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts, + "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) if err != nil { diff --git a/internal/tools/addr/addr.go b/internal/tools/addr/addr.go deleted file mode 100644 index b7cf8a2dbf..0000000000 --- a/internal/tools/addr/addr.go +++ /dev/null @@ -1,165 +0,0 @@ -// addr provides functions to retrieve local IP addresses from device interfaces. -package addr - -import ( - "net" - - "github.com/pkg/errors" -) - -var ( - // ErrIPNotFound no IP address found, and explicit IP not provided. - ErrIPNotFound = errors.New("no IP address found, and explicit IP not provided") -) - -// IsLocal checks whether an IP belongs to one of the device's interfaces. -func IsLocal(addr string) bool { - // Extract the host - host, _, err := net.SplitHostPort(addr) - if err == nil { - addr = host - } - - if addr == "localhost" { - return true - } - - // Check against all local ips - for _, ip := range IPs() { - if addr == ip { - return true - } - } - - return false -} - -// Extract returns a valid IP address. If the address provided is a valid -// address, it will be returned directly. Otherwise, the available interfaces -// will be iterated over to find an IP address, preferably private. -func Extract(addr string) (string, error) { - // if addr is already specified then it's directly returned - if len(addr) > 0 && (addr != "0.0.0.0" && addr != "[::]" && addr != "::") { - return addr, nil - } - - var ( - addrs []net.Addr - loAddrs []net.Addr - ) - - ifaces, err := net.Interfaces() - if err != nil { - return "", errors.Wrap(err, "failed to get interfaces") - } - - for _, iface := range ifaces { - ifaceAddrs, err := iface.Addrs() - if err != nil { - // ignore error, interface can disappear from system - continue - } - - if iface.Flags&net.FlagLoopback != 0 { - loAddrs = append(loAddrs, ifaceAddrs...) - continue - } - - addrs = append(addrs, ifaceAddrs...) - } - - // Add loopback addresses to the end of the list - addrs = append(addrs, loAddrs...) - - // Try to find private IP in list, public IP otherwise - ip, err := findIP(addrs) - if err != nil { - return "", err - } - - return ip.String(), nil -} - -// IPs returns all available interface IP addresses. -func IPs() []string { - ifaces, err := net.Interfaces() - if err != nil { - return nil - } - - var ipAddrs []string - - for _, i := range ifaces { - addrs, err := i.Addrs() - if err != nil { - continue - } - - for _, addr := range addrs { - var ip net.IP - switch v := addr.(type) { - case *net.IPNet: - ip = v.IP - case *net.IPAddr: - ip = v.IP - } - - if ip == nil { - continue - } - - ipAddrs = append(ipAddrs, ip.String()) - } - } - - return ipAddrs -} - -// findIP will return the first private IP available in the list. -// If no private IP is available it will return the first public IP, if present. -// If no public IP is available, it will return the first loopback IP, if present. -func findIP(addresses []net.Addr) (net.IP, error) { - var publicIP net.IP - var localIP net.IP - - for _, rawAddr := range addresses { - var ip net.IP - switch addr := rawAddr.(type) { - case *net.IPAddr: - ip = addr.IP - case *net.IPNet: - ip = addr.IP - default: - continue - } - - if ip.IsLoopback() { - if localIP == nil { - localIP = ip - } - continue - } - - if !ip.IsPrivate() { - if publicIP == nil { - publicIP = ip - } - continue - } - - // Return private IP if available - return ip, nil - } - - // Return public or virtual IP - if len(publicIP) > 0 { - return publicIP, nil - } - - // Return local IP - if len(localIP) > 0 { - return localIP, nil - } - - return nil, ErrIPNotFound -} diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index f8232bc260..34f450fb19 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -55,5 +55,6 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, + a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, a.Index(), a.authConfig.Share.RpcRegisterName.Auth, &a.authConfig.Share, a.authConfig, auth.Start) } diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 6a0da11e5d..f8ec570a79 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -57,5 +57,6 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, + a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, a.Index(), a.conversationConfig.Share.RpcRegisterName.Conversation, &a.conversationConfig.Share, a.conversationConfig, conversation.Start) } diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 2ff25090e9..786be0d8e9 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -58,5 +58,6 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, + a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index f1757c7fa2..e405c91355 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -59,5 +59,6 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, + a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, a.Index(), a.groupConfig.Share.RpcRegisterName.Group, &a.groupConfig.Share, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index ae718a11d1..3f1a2fa31c 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -59,5 +59,6 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, + a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, a.Index(), a.msgConfig.Share.RpcRegisterName.Msg, &a.msgConfig.Share, a.msgConfig, msg.Start) } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 2f9e248f14..e0f6b39f65 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -59,5 +59,6 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, + a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, a.Index(), a.pushConfig.Share.RpcRegisterName.Push, &a.pushConfig.Share, a.pushConfig, push.Start) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index fa1d5d42dc..c5a21f91be 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -58,5 +58,6 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, + a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, a.Index(), a.thirdConfig.Share.RpcRegisterName.Third, &a.thirdConfig.Share, a.thirdConfig, third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index 8a22bfce91..1d916ea377 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -59,5 +59,6 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, + a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, a.Index(), a.userConfig.Share.RpcRegisterName.User, &a.userConfig.Share, a.userConfig, user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 5fb12b399b..ed803359c9 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -176,8 +176,9 @@ type Prometheus struct { type MsgGateway struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` ListenIP string `mapstructure:"listenIP"` @@ -195,8 +196,10 @@ type MsgTransfer struct { type Push struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` MaxConcurrentWorkers int `mapstructure:"maxConcurrentWorkers"` @@ -229,9 +232,10 @@ type Push struct { type Auth struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` TokenPolicy struct { @@ -241,27 +245,30 @@ type Auth struct { type Conversation struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } type Friend struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } type Group struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` EnableHistoryForNewMembers bool `mapstructure:"enableHistoryForNewMembers"` @@ -269,9 +276,10 @@ type Group struct { type Msg struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` FriendVerify bool `mapstructure:"friendVerify"` @@ -279,9 +287,10 @@ type Msg struct { type Third struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` Object struct { @@ -328,9 +337,10 @@ type Kodo struct { type User struct { RPC struct { - RegisterIP string `mapstructure:"registerIP"` - ListenIP string `mapstructure:"listenIP"` - Ports []int `mapstructure:"ports"` + RegisterIP string `mapstructure:"registerIP"` + ListenIP string `mapstructure:"listenIP"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` Prometheus Prometheus `mapstructure:"prometheus"` } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 5c42586bf8..26fbb0ffaf 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -16,6 +16,7 @@ package startrpc import ( "context" + "errors" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/utils/datautil" @@ -27,7 +28,6 @@ import ( "syscall" "time" - "github.com/openimsdk/open-im-server/v3/internal/tools/addr" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" @@ -41,11 +41,60 @@ import ( ) // Start rpc server. -func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP string, - index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, - config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { +func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusConfig *config.Prometheus, listenIP, + registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, share *config.Share, config T, rpcFn func(ctx context.Context, + config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + + var ( + rpcTcpAddr string + netDone = make(chan struct{}, 2) + netErr error + ) + + if !autoSetPorts { + rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) + if err != nil { + return err + } + rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), strconv.Itoa(rpcPort)) + } else { + rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") + } + + // var reg *prometheus.Registry + // var metric *grpcprometheus.ServerMetrics + if prometheusConfig.Enable { + // cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + // reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) + // options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), + // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) + options = append( + options, mw.GrpcServer(), + prommetricsUnaryInterceptor(rpcRegisterName), + prommetricsStreamInterceptor(rpcRegisterName), + ) + prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) + if err != nil { + return err + } + cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) + go func() { + if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) + netDone <- struct{}{} + } + // metric.InitializeMetrics(srv) + // Create a HTTP server for prometheus. + // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} + // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) + // netDone <- struct{}{} + // } + }() + } else { + options = append(options, mw.GrpcServer()) + } - rpcTcpAddr := net.JoinHostPort(network.GetListenIP(listenIP), "0") listener, err := net.Listen( "tcp", rpcTcpAddr, @@ -54,8 +103,8 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) } - h, portStr, _ := net.SplitHostPort(listener.Addr().String()) - host, _ := addr.Extract(h) + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + registerIP = network.GetListenIP(registerIP) port, _ := strconv.Atoi(portStr) log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, @@ -70,22 +119,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo defer client.Close() client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - // var reg *prometheus.Registry - // var metric *grpcprometheus.ServerMetrics - if prometheusConfig.Enable { - // cusMetrics := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - // reg, metric, _ = prommetrics.NewGrpcPromObj(cusMetrics) - // options = append(options, mw.GrpcServer(), grpc.StreamInterceptor(metric.StreamServerInterceptor()), - // grpc.UnaryInterceptor(metric.UnaryServerInterceptor())) - options = append( - options, mw.GrpcServer(), - prommetricsUnaryInterceptor(rpcRegisterName), - prommetricsStreamInterceptor(rpcRegisterName), - ) - } else { - options = append(options, mw.GrpcServer()) - } - srv := grpc.NewServer(options...) err = rpcFn(ctx, config, client, srv) @@ -95,7 +128,7 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo err = client.Register( rpcRegisterName, - host, + registerIP, port, grpc.WithTransportCredentials(insecure.NewCredentials()), ) @@ -103,33 +136,6 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo return err } - var ( - netDone = make(chan struct{}, 2) - netErr error - ) - if prometheusConfig.Enable { - go func() { - prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - netErr = err - netDone <- struct{}{} - return - } - cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, share) - if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && err != http.ErrServerClosed { - netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) - netDone <- struct{}{} - } - // metric.InitializeMetrics(srv) - // Create a HTTP server for prometheus. - // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} - // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { - // netErr = errs.WrapMsg(err, "prometheus start err", httpServer.Addr) - // netDone <- struct{}{} - // } - }() - } - go func() { err := srv.Serve(listener) if err != nil {