diff --git a/go.mod b/go.mod index 47484fdc85..f0fdbaf7d2 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,6 @@ require ( github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 go.uber.org/automaxprocs v1.5.3 - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.8.0 ) @@ -173,6 +172,7 @@ require ( go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/image v0.15.0 // indirect golang.org/x/net v0.29.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect diff --git a/go.sum b/go.sum index dab16222c0..b913944846 100644 --- a/go.sum +++ b/go.sum @@ -317,8 +317,8 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= -github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= -github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= +github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= +github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.68 h1:Ekn6S9Ftt12Xs/p9kJ39RDr2gSwIczz+MmSHQE4lAek= github.com/openimsdk/protocol v0.0.72-alpha.68/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ= diff --git a/internal/api/auth.go b/internal/api/auth.go index 01f97bfa97..92d911b712 100644 --- a/internal/api/auth.go +++ b/internal/api/auth.go @@ -16,7 +16,6 @@ package api import ( "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/auth" "github.com/openimsdk/tools/a2r" ) diff --git a/internal/api/conversation.go b/internal/api/conversation.go index 0e6650cc10..f7dbc133c1 100644 --- a/internal/api/conversation.go +++ b/internal/api/conversation.go @@ -16,7 +16,6 @@ package api import ( "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/a2r" ) diff --git a/internal/api/friend.go b/internal/api/friend.go index 1b123e2631..7d84ff0dca 100644 --- a/internal/api/friend.go +++ b/internal/api/friend.go @@ -17,7 +17,6 @@ package api import ( "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/tools/a2r" ) diff --git a/internal/api/group.go b/internal/api/group.go index 0e5f0e0cda..97b6b73f05 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -16,7 +16,6 @@ package api import ( "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/group" "github.com/openimsdk/tools/a2r" ) diff --git a/internal/api/init.go b/internal/api/init.go index 7a00ef203b..2a388655cd 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -30,7 +30,7 @@ import ( kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" - "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/system/program" @@ -55,7 +55,6 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil { return errs.WrapMsg(err, "failed to register discovery service") } - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) var ( netDone = make(chan struct{}, 1) @@ -63,26 +62,6 @@ func Start(ctx context.Context, index int, config *Config) error { prometheusPort int ) - registerIP, err := network.GetRpcRegisterIP("") - if err != nil { - return err - } - - getAutoPort := func() (net.Listener, int, error) { - registerAddr := net.JoinHostPort(registerIP, "0") - listener, err := net.Listen("tcp", registerAddr) - if err != nil { - return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) - } - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - port, _ := strconv.Atoi(portStr) - return listener, port, nil - } - - if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != conf.ETCD { - return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() - } - router, err := newGinRouter(ctx, client, config) if err != nil { return err diff --git a/internal/api/jssdk/jssdk.go b/internal/api/jssdk/jssdk.go index 2691f51416..3c09112077 100644 --- a/internal/api/jssdk/jssdk.go +++ b/internal/api/jssdk/jssdk.go @@ -13,7 +13,6 @@ import ( "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" - "sort" ) const ( @@ -21,22 +20,23 @@ const ( defaultGetActiveConversation = 100 ) -func NewJSSdkApi(user user.UserClient, friend relation.FriendClient, group group.GroupClient, msg msg.MsgClient, conv conversation.ConversationClient) *JSSdk { +func NewJSSdkApi(userClient *rpcli.UserClient, relationClient *rpcli.RelationClient, groupClient *rpcli.GroupClient, + conversationClient *rpcli.ConversationClient, msgClient *rpcli.MsgClient) *JSSdk { return &JSSdk{ - user: user, - friend: friend, - group: group, - msg: msg, - conv: conv, + userClient: userClient, + relationClient: relationClient, + groupClient: groupClient, + conversationClient: conversationClient, + msgClient: msgClient, } } type JSSdk struct { - userClient rpcli.UserClient - relationClient rpcli.RelationClient - groupClient rpcli.GroupClient - conversationClient rpcli.ConversationClient - msgClient rpcli.MsgClient + userClient *rpcli.UserClient + relationClient *rpcli.RelationClient + groupClient *rpcli.GroupClient + conversationClient *rpcli.ConversationClient + msgClient *rpcli.MsgClient } func (x *JSSdk) GetActiveConversations(c *gin.Context) { diff --git a/internal/api/router.go b/internal/api/router.go index 34714a1367..9b3fac24ff 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/group" "github.com/openimsdk/protocol/msg" @@ -23,12 +24,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "net/http" - "strings" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" @@ -61,31 +58,31 @@ func prommetricsGin() gin.HandlerFunc { func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) (*gin.Engine, error) { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - authConn, err := client.GetConn(ctx, config.Discovery.RpcService.Auth) + authConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Auth) if err != nil { return nil, err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return nil, err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return nil, err } - friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend) + friendConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Friend) if err != nil { return nil, err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return nil, err } - thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) + thirdConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Third) if err != nil { return nil, err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return nil, err } @@ -94,15 +91,6 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co if v, ok := binding.Validator.Engine().(*validator.Validate); ok { _ = v.RegisterValidation("required_if", RequiredIf) } - // init rpc client here - userRpc := rpcclient.NewUser(disCov, config.Share.RpcRegisterName.User, config.Share.RpcRegisterName.MessageGateway, - config.Share.IMAdminUserID) - groupRpc := rpcclient.NewGroup(disCov, config.Share.RpcRegisterName.Group) - friendRpc := rpcclient.NewFriend(disCov, config.Share.RpcRegisterName.Friend) - messageRpc := rpcclient.NewMessage(disCov, config.Share.RpcRegisterName.Msg) - conversationRpc := rpcclient.NewConversation(disCov, config.Share.RpcRegisterName.Conversation) - authRpc := rpcclient.NewAuth(disCov, config.Share.RpcRegisterName.Auth) - thirdRpc := rpcclient.NewThird(disCov, config.Share.RpcRegisterName.Third, config.API.Prometheus.GrafanaURL) switch config.API.Api.CompressionLevel { case NoCompression: case DefaultCompression: @@ -112,10 +100,10 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co case BestSpeed: r.Use(gzip.Gzip(gzip.BestSpeed)) } - r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn))) - j := jssdk.NewJSSdkApi() + r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), + mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn))) - u := NewUserApi(user.NewUserClient(userConn), client, config.Discovery.RpcService) + u := NewUserApi(user.NewUserClient(userConn), client, config.Share.RpcRegisterName) { userRouterGroup := r.Group("/user") userRouterGroup.POST("/user_register", u.UserRegister) @@ -290,25 +278,12 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co } { + j := jssdk.NewJSSdkApi(rpcli.NewUserClient(userConn), rpcli.NewRelationClient(friendConn), + rpcli.NewGroupClient(groupConn), rpcli.NewConversationClient(conversationConn), rpcli.NewMsgClient(msgConn)) jssdk := r.Group("/jssdk") jssdk.POST("/get_conversations", j.GetConversations) jssdk.POST("/get_active_conversations", j.GetActiveConversations) } - { - pd := NewPrometheusDiscoveryApi(config, client) - proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) - proDiscoveryGroup.GET("/api", pd.Api) - proDiscoveryGroup.GET("/user", pd.User) - proDiscoveryGroup.GET("/group", pd.Group) - proDiscoveryGroup.GET("/msg", pd.Msg) - proDiscoveryGroup.GET("/friend", pd.Friend) - proDiscoveryGroup.GET("/conversation", pd.Conversation) - proDiscoveryGroup.GET("/third", pd.Third) - proDiscoveryGroup.GET("/auth", pd.Auth) - proDiscoveryGroup.GET("/push", pd.Push) - proDiscoveryGroup.GET("/msg_gateway", pd.MessageGateway) - proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) - } return r, nil } diff --git a/internal/api/statistics.go b/internal/api/statistics.go deleted file mode 100644 index f5ee99f733..0000000000 --- a/internal/api/statistics.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" - "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/a2r" -) - -type StatisticsApi rpcclient.User - -func NewStatisticsApi(client rpcclient.User) StatisticsApi { - return StatisticsApi(client) -} - -func (s *StatisticsApi) UserRegister(c *gin.Context) { - a2r.Call(user.UserClient.UserRegisterCount, s.Client, c) -} diff --git a/internal/api/third.go b/internal/api/third.go index fcf1f9aa84..7ecb329112 100644 --- a/internal/api/third.go +++ b/internal/api/third.go @@ -24,7 +24,6 @@ import ( "strings" "github.com/gin-gonic/gin" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/a2r" "github.com/openimsdk/tools/errs" diff --git a/internal/api/user.go b/internal/api/user.go index 92f04cccf8..154e6d9082 100644 --- a/internal/api/user.go +++ b/internal/api/user.go @@ -22,6 +22,7 @@ import ( "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/a2r" "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" ) @@ -29,10 +30,10 @@ import ( type UserApi struct { Client user.UserClient discov discovery.SvcDiscoveryRegistry - config config.RpcService + config config.RpcRegisterName } -func NewUserApi(client user.UserClient, discov discovery.SvcDiscoveryRegistry, config config.RpcService) UserApi { +func NewUserApi(client user.UserClient, discov discovery.SvcDiscoveryRegistry, config config.RpcRegisterName) UserApi { return UserApi{Client: client, discov: discov, config: config} } diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index b90390e802..1040f2be23 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "runtime/debug" "sync" "sync/atomic" "time" diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 4066e9a9e6..ce63bc1b93 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/protocol/sdkws" @@ -36,7 +35,7 @@ import ( ) func (s *Server) InitServer(ctx context.Context, config *Config, disCov discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := disCov.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } @@ -45,7 +44,6 @@ func (s *Server) InitServer(ctx context.Context, config *Config, disCov discover return err } msggateway.RegisterMsgGatewayServer(server, s) - s.userRcp = rpcclient.NewUserRpcClient(disCov, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) if s.ready != nil { return s.ready(s) } @@ -70,7 +68,6 @@ type Server struct { config *Config pushTerminal map[int]struct{} ready func(srv *Server) error - userRcp rpcclient.UserRpcClient queue *memamq.MemoryQueue userClient *rpcli.UserClient } @@ -79,9 +76,8 @@ func (s *Server) SetLongConnServer(LongConnServer LongConnServer) { s.LongConnServer = LongConnServer } -func NewServer(rpcPort int, longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { +func NewServer(longConnServer LongConnServer, conf *Config, ready func(srv *Server) error) *Server { s := &Server{ - rpcPort: rpcPort, LongConnServer: longConnServer, pushTerminal: make(map[int]struct{}), config: conf, diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index e7687becf3..ed7e04b7b9 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -41,10 +41,6 @@ func Start(ctx context.Context, index int, conf *Config) error { if err != nil { return err } - rpcPort, err := datautil.GetElemByIndex(conf.MsgGateway.RPC.Ports, index) - if err != nil { - return err - } rdb, err := redisutil.NewRedisClient(ctx, conf.RedisConfig.Build()) if err != nil { return err diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 7b94f97400..9b59867d61 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -23,12 +23,9 @@ import ( "github.com/go-playground/validator/v10" "google.golang.org/protobuf/proto" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/push" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/jsonutil" ) diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index b6b518feca..208289e9ef 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -9,24 +9,18 @@ import ( "sync/atomic" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" - "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - pbAuth "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/tools/mcontext" - "net/http" - "sync" - "sync/atomic" - "time" - "github.com/go-playground/validator/v10" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" + "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" + "github.com/openimsdk/open-im-server/v3/pkg/rpccache" + pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/stringutil" "golang.org/x/sync/errgroup" ) @@ -62,8 +56,6 @@ type WsServer struct { handshakeTimeout time.Duration writeBufferSize int validate *validator.Validate - userClient *rpcclient.UserRpcClient - authClient *rpcclient.Auth disCov discovery.SvcDiscoveryRegistry Compressor //Encoder @@ -80,19 +72,19 @@ type kickHandler struct { } func (ws *WsServer) SetDiscoveryRegistry(ctx context.Context, disCov discovery.SvcDiscoveryRegistry, config *Config) error { - userConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := disCov.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } - pushConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.Push) + pushConn, err := disCov.GetConn(ctx, config.Share.RpcRegisterName.Push) if err != nil { return err } - authConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.Auth) + authConn, err := disCov.GetConn(ctx, config.Share.RpcRegisterName.Auth) if err != nil { return err } - msgConn, err := disCov.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := disCov.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 19619c6a3f..42612e2948 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -23,11 +23,6 @@ import ( "os/signal" "syscall" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/utils/jsonutil" - "github.com/openimsdk/tools/utils/network" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" @@ -38,7 +33,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" @@ -87,20 +81,6 @@ func Start(ctx context.Context, index int, config *Config) error { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - if config.Discovery.Enable == conf.ETCD { - cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ - config.MsgTransfer.GetConfigFileName(), - config.RedisConfig.GetConfigFileName(), - config.MongodbConfig.GetConfigFileName(), - config.KafkaConfig.GetConfigFileName(), - config.Share.GetConfigFileName(), - config.WebhooksConfig.GetConfigFileName(), - config.Discovery.GetConfigFileName(), - conf.LogConfigFileName, - }) - cm.Watch(ctx) - } - msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err @@ -153,11 +133,6 @@ func (m *MsgTransfer) Start(index int, config *Config) error { if config.MsgTransfer.Prometheus.Enable { go func() { - defer func() { - if r := recover(); r != nil { - log.ZPanic(m.ctx, "MsgTransfer Start Panic", errs.ErrPanic(r)) - } - }() prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) if err != nil { netErr = err diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 9efbfb5ff3..fce0297ee2 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -29,10 +29,8 @@ import ( "github.com/IBM/sarama" "github.com/go-redis/redis" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/tools/batcher" "github.com/openimsdk/protocol/constant" pbconv "github.com/openimsdk/protocol/conversation" @@ -72,8 +70,6 @@ type OnlineHistoryRedisConsumerHandler struct { redisMessageBatches *batcher.Batcher[sarama.ConsumerMessage] msgTransferDatabase controller.MsgTransferDatabase - conversationRpcClient *rpcclient.ConversationRpcClient - groupRpcClient *rpcclient.GroupRpcClient conversationUserHasReadChan chan *userHasReadSeq wg sync.WaitGroup @@ -87,11 +83,11 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. if err != nil { return nil, err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return nil, err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return nil, err } @@ -119,11 +115,9 @@ func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery. } b.Do = och.do och.redisMessageBatches = b - och.conversationRpcClient = conversationRpcClient - och.groupRpcClient = groupRpcClient och.historyConsumerGroup = historyConsumerGroup - return &och, err + return &och, nil } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) { ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID()) diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index c1077be642..9defb59691 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -16,7 +16,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/util/conversationutil" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msggateway" @@ -41,9 +40,6 @@ type ConsumerHandler struct { onlineCache *rpccache.OnlineCache groupLocalCache *rpccache.GroupLocalCache conversationLocalCache *rpccache.ConversationLocalCache - msgRpcClient rpcclient.MessageRpcClient - conversationRpcClient rpcclient.ConversationRpcClient - groupRpcClient rpcclient.GroupRpcClient webhookClient *webhook.Client config *Config userClient *rpcli.UserClient @@ -61,19 +57,19 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller if err != nil { return nil, err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return nil, err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return nil, err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return nil, err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return nil, err } @@ -82,8 +78,6 @@ func NewConsumerHandler(ctx context.Context, config *Config, database controller consumerHandler.msgClient = rpcli.NewMsgClient(msgConn) consumerHandler.conversationClient = rpcli.NewConversationClient(conversationConn) - userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID) - consumerHandler.offlinePusher = offlinePusher consumerHandler.onlinePusher = NewOnlinePusher(client, config) consumerHandler.groupLocalCache = rpccache.NewGroupLocalCache(consumerHandler.groupClient, &config.LocalCacheConfig, rdb) diff --git a/internal/rpc/auth/auth.go b/internal/rpc/auth/auth.go index d710b491b3..9df012e86c 100644 --- a/internal/rpc/auth/auth.go +++ b/internal/rpc/auth/auth.go @@ -42,7 +42,6 @@ import ( type authServer struct { pbauth.UnimplementedAuthServer authDatabase controller.AuthDatabase - userRpcClient *rpcclient.UserRpcClient RegisterCenter discovery.SvcDiscoveryRegistry config *Config userClient *rpcli.UserClient @@ -60,12 +59,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } pbauth.RegisterAuthServer(server, &authServer{ - userRpcClient: &userRpcClient, RegisterCenter: client, authDatabase: controller.NewAuthDatabase( redis2.NewTokenCacheModel(rdb, config.RpcConfig.TokenPolicy.Expire), diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index 01204ed6a0..bfc9250085 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -45,9 +45,6 @@ import ( type conversationServer struct { pbconversation.UnimplementedConversationServer - msgRpcClient *rpcclient.MessageRpcClient - user *rpcclient.UserRpcClient - groupRpcClient *rpcclient.GroupRpcClient conversationDatabase controller.ConversationDatabase conversationNotificationSender *ConversationNotificationSender @@ -81,15 +78,15 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 80c5649f71..649027fb81 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -103,15 +103,15 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg //msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) //conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return err } diff --git a/internal/rpc/group/notification.go b/internal/rpc/group/notification.go index 8246a23295..1aa5333b4f 100644 --- a/internal/rpc/group/notification.go +++ b/internal/rpc/group/notification.go @@ -42,7 +42,6 @@ import ( "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/stringutil" "go.mongodb.org/mongo-driver/mongo" - "time" ) // GroupApplicationReceiver diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index a6bc7c15ea..74a6355d71 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -56,9 +56,8 @@ type Config struct { // MsgServer encapsulates dependencies required for message handling. type msgServer struct { msg.UnimplementedMsgServer - RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. - StreamMsgDatabase controller.StreamMsgDatabase + RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. GroupLocalCache *rpccache.GroupLocalCache // Local cache for group data. @@ -104,25 +103,24 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return err } - friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend) + friendConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Friend) if err != nil { return err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Conversation) if err != nil { return err } conversationClient := rpcli.NewConversationClient(conversationConn) s := &msgServer{ - Conversation: &conversationClient, MsgDatabase: msgDatabase, RegisterCenter: client, UserLocalCache: rpccache.NewUserLocalCache(rpcli.NewUserClient(userConn), &config.LocalCacheConfig, rdb), @@ -155,11 +153,3 @@ func (m *msgServer) conversationAndGetRecvID(conversation *conversation.Conversa } return "" } - -func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { - return nil, nil -} - -func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { - return nil, nil -} diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go deleted file mode 100644 index 688d766c88..0000000000 --- a/internal/rpc/msg/stream_msg.go +++ /dev/null @@ -1,115 +0,0 @@ -package msg - -import ( - "context" - "fmt" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/errs" -) - -const StreamDeadlineTime = time.Second * 60 * 10 - -func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { - now := time.Now() - val := &model.StreamMsg{ - ClientMsgID: msgData.ClientMsgID, - ConversationID: msgprocessor.GetConversationIDByMsg(msgData), - UserID: msgData.SendID, - CreateTime: now, - DeadlineTime: now.Add(StreamDeadlineTime), - } - return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) -} - -func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { - res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) - if err != nil { - return nil, err - } - now := time.Now() - if !res.End && res.DeadlineTime.Before(now) { - res.End = true - res.DeadlineTime = now - _ = m.StreamMsgDatabase.AppendStreamMsg(ctx, res.ClientMsgID, 0, nil, true, now) - } - return res, nil -} - -func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - if res.End { - return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") - } - if len(res.Packets) < int(req.StartIndex) { - return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") - } - if val := len(res.Packets) - int(req.StartIndex); val > 0 { - exist := res.Packets[int(req.StartIndex):] - for i, s := range exist { - if len(req.Packets) == 0 { - break - } - if s != req.Packets[i] { - return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) - } - req.StartIndex++ - req.Packets = req.Packets[1:] - } - } - if len(req.Packets) == 0 && res.End == req.End { - return &msg.AppendStreamMsgResp{}, nil - } - deadlineTime := time.Now().Add(StreamDeadlineTime) - if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End, deadlineTime); err != nil { - return nil, err - } - conversation, err := m.conversationClient.GetConversation(ctx, res.ConversationID, res.UserID) - if err != nil { - return nil, err - } - tips := &sdkws.StreamMsgTips{ - ConversationID: res.ConversationID, - ClientMsgID: res.ClientMsgID, - StartIndex: req.StartIndex, - Packets: req.Packets, - End: req.End, - } - var ( - recvID string - sessionType int32 - ) - if conversation.GroupID == "" { - sessionType = constant.SingleChatType - recvID = conversation.UserID - } else { - sessionType = constant.ReadGroupChatType - recvID = conversation.GroupID - } - m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) - return &msg.AppendStreamMsgResp{}, nil -} - -func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { - res, err := m.getStreamMsg(ctx, req.ClientMsgID) - if err != nil { - return nil, err - } - return &msg.GetStreamMsgResp{ - ClientMsgID: res.ClientMsgID, - ConversationID: res.ConversationID, - UserID: res.UserID, - Packets: res.Packets, - End: res.End, - CreateTime: res.CreateTime.UnixMilli(), - DeadlineTime: res.DeadlineTime.UnixMilli(), - }, nil -} diff --git a/internal/rpc/relation/black.go b/internal/rpc/relation/black.go index ef55365cad..b795d62489 100644 --- a/internal/rpc/relation/black.go +++ b/internal/rpc/relation/black.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/convert" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/relation" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" diff --git a/internal/rpc/relation/friend.go b/internal/rpc/relation/friend.go index 79db149705..e9a5818edc 100644 --- a/internal/rpc/relation/friend.go +++ b/internal/rpc/relation/friend.go @@ -91,11 +91,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg return err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 6469c90512..bc251ebd00 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -22,12 +22,9 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/redis" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "time" - - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -44,7 +41,6 @@ type thirdServer struct { third.UnimplementedThirdServer thirdDatabase controller.ThirdDatabase s3dataBase controller.S3Database - userRpcClient rpcclient.UserRpcClient defaultExpire time.Duration config *Config s3 s3.Interface @@ -100,14 +96,13 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.User) if err != nil { return err } localcache.InitLocalCache(&config.LocalCacheConfig) third.RegisterThirdServer(server, &thirdServer{ thirdDatabase: controller.NewThirdDatabase(redis.NewThirdCache(rdb), logdb), - userRpcClient: rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID), s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index 687823eaa6..02b3dc81d1 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -40,7 +40,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/sdkws" pbuser "github.com/openimsdk/protocol/user" @@ -58,8 +57,6 @@ type userServer struct { db controller.UserDatabase friendNotificationSender *relation.FriendNotificationSender userNotificationSender *UserNotificationSender - friendRpcClient *rpcclient.FriendRpcClient - groupRpcClient *rpcclient.GroupRpcClient RegisterCenter registry.SvcDiscoveryRegistry config *Config webhookClient *webhook.Client @@ -97,24 +94,21 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi if err != nil { return err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Msg) if err != nil { return err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Group) if err != nil { return err } - friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend) + friendConn, err := client.GetConn(ctx, config.Share.RpcRegisterName.Friend) if err != nil { return err } msgClient := rpcli.NewMsgClient(msgConn) userCache := redis.NewUserCacheRedis(rdb, &config.LocalCacheConfig, userDB, redis.GetRocksCacheOptions()) database := controller.NewUserDatabase(userDB, userCache, mgocli.GetTx()) - friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend) - groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group) - msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) localcache.InitLocalCache(&config.LocalCacheConfig) u := &userServer{ online: redis.NewUserOnline(rdb), diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 6776580895..31f1a060a3 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -28,14 +28,6 @@ import ( "syscall" "time" - conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/tools/discovery/etcd" - "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/jsonutil" - "google.golang.org/grpc/status" - - "github.com/openimsdk/tools/utils/runtimeenv" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" diff --git a/pkg/notification/auth.go b/pkg/notification/auth.go deleted file mode 100644 index 05fec35a08..0000000000 --- a/pkg/notification/auth.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - - "github.com/openimsdk/protocol/auth" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -func NewAuth(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Auth { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := auth.NewAuthClient(conn) - return &Auth{discov: discov, conn: conn, Client: client} -} - -type Auth struct { - conn grpc.ClientConnInterface - Client auth.AuthClient - discov discovery.SvcDiscoveryRegistry -} - -func (a *Auth) ParseToken(ctx context.Context, token string) (*auth.ParseTokenResp, error) { - req := auth.ParseTokenReq{ - Token: token, - } - resp, err := a.Client.ParseToken(ctx, &req) - if err != nil { - return nil, err - } - return resp, err -} - -func (a *Auth) InvalidateToken(ctx context.Context, preservedToken, userID string, platformID int) (*auth.InvalidateTokenResp, error) { - req := auth.InvalidateTokenReq{ - PreservedToken: preservedToken, - UserID: userID, - PlatformID: int32(platformID), - } - resp, err := a.Client.InvalidateToken(ctx, &req) - if err != nil { - return nil, err - } - return resp, err -} - -func (a *Auth) KickTokens(ctx context.Context, tokens []string) (*auth.KickTokensResp, error) { - req := auth.KickTokensReq{ - Tokens: tokens, - } - resp, err := a.Client.KickTokens(ctx, &req) - if err != nil { - return nil, err - } - return resp, err -} diff --git a/pkg/notification/conversation.go b/pkg/notification/conversation.go deleted file mode 100644 index c69d355d68..0000000000 --- a/pkg/notification/conversation.go +++ /dev/null @@ -1,161 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - "fmt" - - pbconversation "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -type Conversation struct { - Client pbconversation.ConversationClient - conn grpc.ClientConnInterface - discov discovery.SvcDiscoveryRegistry -} - -func NewConversation(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Conversation { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := pbconversation.NewConversationClient(conn) - return &Conversation{discov: discov, conn: conn, Client: client} -} - -type ConversationRpcClient Conversation - -func NewConversationRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) ConversationRpcClient { - return ConversationRpcClient(*NewConversation(discov, rpcRegisterName)) -} - -func (c *ConversationRpcClient) GetSingleConversationRecvMsgOpt(ctx context.Context, userID, conversationID string) (int32, error) { - var req pbconversation.GetConversationReq - req.OwnerUserID = userID - req.ConversationID = conversationID - conversation, err := c.Client.GetConversation(ctx, &req) - if err != nil { - return 0, err - } - return conversation.GetConversation().RecvMsgOpt, err -} - -func (c *ConversationRpcClient) SingleChatFirstCreateConversation(ctx context.Context, recvID, sendID, - conversationID string, conversationType int32) error { - _, err := c.Client.CreateSingleChatConversations(ctx, - &pbconversation.CreateSingleChatConversationsReq{ - RecvID: recvID, SendID: sendID, ConversationID: conversationID, - ConversationType: conversationType, - }) - return err -} - -func (c *ConversationRpcClient) GroupChatFirstCreateConversation(ctx context.Context, groupID string, userIDs []string) error { - _, err := c.Client.CreateGroupChatConversations(ctx, &pbconversation.CreateGroupChatConversationsReq{UserIDs: userIDs, GroupID: groupID}) - return err -} - -func (c *ConversationRpcClient) SetConversationMaxSeq(ctx context.Context, ownerUserIDs []string, conversationID string, maxSeq int64) error { - _, err := c.Client.SetConversationMaxSeq(ctx, &pbconversation.SetConversationMaxSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MaxSeq: maxSeq}) - return err -} - -func (c *ConversationRpcClient) SetConversationMinSeq(ctx context.Context, ownerUserIDs []string, conversationID string, minSeq int64) error { - _, err := c.Client.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{OwnerUserID: ownerUserIDs, ConversationID: conversationID, MinSeq: minSeq}) - return err -} - -func (c *ConversationRpcClient) SetConversations(ctx context.Context, userIDs []string, conversation *pbconversation.ConversationReq) error { - _, err := c.Client.SetConversations(ctx, &pbconversation.SetConversationsReq{UserIDs: userIDs, Conversation: conversation}) - return err -} - -func (c *ConversationRpcClient) UpdateConversation(ctx context.Context, conversation *pbconversation.UpdateConversationReq) error { - _, err := c.Client.UpdateConversation(ctx, conversation) - return err -} - -func (c *ConversationRpcClient) GetConversationIDs(ctx context.Context, ownerUserID string) ([]string, error) { - resp, err := c.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID}) - if err != nil { - return nil, err - } - return resp.ConversationIDs, nil -} - -func (c *ConversationRpcClient) GetConversation(ctx context.Context, ownerUserID, conversationID string) (*pbconversation.Conversation, error) { - resp, err := c.Client.GetConversation(ctx, &pbconversation.GetConversationReq{OwnerUserID: ownerUserID, ConversationID: conversationID}) - if err != nil { - return nil, err - } - return resp.Conversation, nil -} - -func (c *ConversationRpcClient) GetConversationsByConversationID(ctx context.Context, conversationIDs []string) ([]*pbconversation.Conversation, error) { - if len(conversationIDs) == 0 { - return nil, nil - } - resp, err := c.Client.GetConversationsByConversationID(ctx, &pbconversation.GetConversationsByConversationIDReq{ConversationIDs: conversationIDs}) - if err != nil { - return nil, err - } - if len(resp.Conversations) == 0 { - return nil, errs.ErrRecordNotFound.WrapMsg(fmt.Sprintf("conversationIDs: %v not found", conversationIDs)) - } - return resp.Conversations, nil -} - -func (c *ConversationRpcClient) GetConversationOfflinePushUserIDs(ctx context.Context, conversationID string, userIDs []string) ([]string, error) { - resp, err := c.Client.GetConversationOfflinePushUserIDs(ctx, &pbconversation.GetConversationOfflinePushUserIDsReq{ConversationID: conversationID, UserIDs: userIDs}) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (c *ConversationRpcClient) GetConversations(ctx context.Context, ownerUserID string, conversationIDs []string) ([]*pbconversation.Conversation, error) { - if len(conversationIDs) == 0 { - return nil, nil - } - resp, err := c.Client.GetConversations( - ctx, - &pbconversation.GetConversationsReq{OwnerUserID: ownerUserID, ConversationIDs: conversationIDs}, - ) - if err != nil { - return nil, err - } - return resp.Conversations, nil -} - -func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { - resp, err := c.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) { - resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) - if err != nil { - return nil, err - } - return resp.Conversations, nil -} diff --git a/pkg/notification/friend.go b/pkg/notification/friend.go deleted file mode 100644 index 359ed3a8b8..0000000000 --- a/pkg/notification/friend.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - - "github.com/openimsdk/protocol/relation" - sdkws "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -type Friend struct { - conn grpc.ClientConnInterface - Client relation.FriendClient - discov discovery.SvcDiscoveryRegistry -} - -func NewFriend(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Friend { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := relation.NewFriendClient(conn) - return &Friend{discov: discov, conn: conn, Client: client} -} - -type FriendRpcClient Friend - -func NewFriendRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) FriendRpcClient { - return FriendRpcClient(*NewFriend(discov, rpcRegisterName)) -} - -func (f *FriendRpcClient) GetFriendsInfo( - ctx context.Context, - ownerUserID, relationUserID string, -) (resp *sdkws.FriendInfo, err error) { - r, err := f.Client.GetDesignatedFriends( - ctx, - &relation.GetDesignatedFriendsReq{OwnerUserID: ownerUserID, FriendUserIDs: []string{relationUserID}}, - ) - if err != nil { - return nil, err - } - resp = r.FriendsInfo[0] - return -} - -// possibleFriendUserID Is PossibleFriendUserId's relations. -func (f *FriendRpcClient) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (bool, error) { - resp, err := f.Client.IsFriend(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID}) - if err != nil { - return false, err - } - return resp.InUser1Friends, nil -} - -func (f *FriendRpcClient) GetFriendIDs(ctx context.Context, ownerUserID string) (relationIDs []string, err error) { - req := relation.GetFriendIDsReq{UserID: ownerUserID} - resp, err := f.Client.GetFriendIDs(ctx, &req) - if err != nil { - return nil, err - } - return resp.FriendIDs, nil -} - -func (f *FriendRpcClient) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (bool, error) { - r, err := f.Client.IsBlack(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID}) - if err != nil { - return false, err - } - return r.InUser2Blacks, nil -} diff --git a/pkg/notification/group.go b/pkg/notification/group.go deleted file mode 100644 index 30d0b3288f..0000000000 --- a/pkg/notification/group.go +++ /dev/null @@ -1,205 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - "strings" - - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "github.com/openimsdk/tools/utils/datautil" -) - -type Group struct { - Client group.GroupClient - discov discovery.SvcDiscoveryRegistry -} - -func NewGroup(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Group { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := group.NewGroupClient(conn) - return &Group{discov: discov, Client: client} -} - -type GroupRpcClient Group - -func NewGroupRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) GroupRpcClient { - return GroupRpcClient(*NewGroup(discov, rpcRegisterName)) -} - -func (g *GroupRpcClient) GetGroupInfos(ctx context.Context, groupIDs []string, complete bool) ([]*sdkws.GroupInfo, error) { - resp, err := g.Client.GetGroupsInfo(ctx, &group.GetGroupsInfoReq{ - GroupIDs: groupIDs, - }) - if err != nil { - return nil, err - } - if complete { - if ids := datautil.Single(groupIDs, datautil.Slice(resp.GroupInfos, func(e *sdkws.GroupInfo) string { - return e.GroupID - })); len(ids) > 0 { - return nil, servererrs.ErrGroupIDNotFound.WrapMsg(strings.Join(ids, ",")) - } - } - return resp.GroupInfos, nil -} - -func (g *GroupRpcClient) GetGroupInfo(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { - groups, err := g.GetGroupInfos(ctx, []string{groupID}, true) - if err != nil { - return nil, err - } - return groups[0], nil -} - -func (g *GroupRpcClient) GetGroupInfoMap( - ctx context.Context, - groupIDs []string, - complete bool, -) (map[string]*sdkws.GroupInfo, error) { - groups, err := g.GetGroupInfos(ctx, groupIDs, complete) - if err != nil { - return nil, err - } - return datautil.SliceToMap(groups, func(e *sdkws.GroupInfo) string { - return e.GroupID - }), nil -} - -func (g *GroupRpcClient) GetGroupMemberInfos( - ctx context.Context, - groupID string, - userIDs []string, - complete bool, -) ([]*sdkws.GroupMemberFullInfo, error) { - resp, err := g.Client.GetGroupMembersInfo(ctx, &group.GetGroupMembersInfoReq{ - GroupID: groupID, - UserIDs: userIDs, - }) - if err != nil { - return nil, err - } - if complete { - if ids := datautil.Single(userIDs, datautil.Slice(resp.Members, func(e *sdkws.GroupMemberFullInfo) string { - return e.UserID - })); len(ids) > 0 { - return nil, servererrs.ErrNotInGroupYet.WrapMsg(strings.Join(ids, ",")) - } - } - return resp.Members, nil -} - -func (g *GroupRpcClient) GetGroupMemberInfo( - ctx context.Context, - groupID string, - userID string, -) (*sdkws.GroupMemberFullInfo, error) { - members, err := g.GetGroupMemberInfos(ctx, groupID, []string{userID}, true) - if err != nil { - return nil, err - } - return members[0], nil -} - -func (g *GroupRpcClient) GetGroupMemberInfoMap( - ctx context.Context, - groupID string, - userIDs []string, - complete bool, -) (map[string]*sdkws.GroupMemberFullInfo, error) { - members, err := g.GetGroupMemberInfos(ctx, groupID, userIDs, true) - if err != nil { - return nil, err - } - return datautil.SliceToMap(members, func(e *sdkws.GroupMemberFullInfo) string { - return e.UserID - }), nil -} - -func (g *GroupRpcClient) GetOwnerAndAdminInfos( - ctx context.Context, - groupID string, -) ([]*sdkws.GroupMemberFullInfo, error) { - resp, err := g.Client.GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ - GroupID: groupID, - RoleLevels: []int32{constant.GroupOwner, constant.GroupAdmin}, - }) - if err != nil { - return nil, err - } - return resp.Members, nil -} - -func (g *GroupRpcClient) GetOwnerInfo(ctx context.Context, groupID string) (*sdkws.GroupMemberFullInfo, error) { - resp, err := g.Client.GetGroupMemberRoleLevel(ctx, &group.GetGroupMemberRoleLevelReq{ - GroupID: groupID, - RoleLevels: []int32{constant.GroupOwner}, - }) - return resp.Members[0], err -} - -func (g *GroupRpcClient) GetGroupMemberIDs(ctx context.Context, groupID string) ([]string, error) { - resp, err := g.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -func (g *GroupRpcClient) GetGroupInfoCache(ctx context.Context, groupID string) (*sdkws.GroupInfo, error) { - resp, err := g.Client.GetGroupInfoCache(ctx, &group.GetGroupInfoCacheReq{ - GroupID: groupID, - }) - if err != nil { - return nil, err - } - return resp.GroupInfo, nil -} - -func (g *GroupRpcClient) GetGroupMemberCache(ctx context.Context, groupID string, groupMemberID string) (*sdkws.GroupMemberFullInfo, error) { - resp, err := g.Client.GetGroupMemberCache(ctx, &group.GetGroupMemberCacheReq{ - GroupID: groupID, - GroupMemberID: groupMemberID, - }) - if err != nil { - return nil, err - } - return resp.Member, nil -} - -func (g *GroupRpcClient) DismissGroup(ctx context.Context, groupID string) error { - _, err := g.Client.DismissGroup(ctx, &group.DismissGroupReq{ - GroupID: groupID, - DeleteMember: true, - }) - return err -} - -func (g *GroupRpcClient) NotificationUserInfoUpdate(ctx context.Context, userID string) error { - _, err := g.Client.NotificationUserInfoUpdate(ctx, &group.NotificationUserInfoUpdateReq{ - UserID: userID, - }) - return err -} diff --git a/pkg/notification/msg.go b/pkg/notification/msg.go index 88cfb06a7a..0795982c8c 100644 --- a/pkg/notification/msg.go +++ b/pkg/notification/msg.go @@ -19,17 +19,14 @@ import ( "encoding/json" "time" - "google.golang.org/grpc" "google.golang.org/protobuf/proto" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mq/memamq" - "github.com/openimsdk/tools/system/program" "github.com/openimsdk/tools/utils/idutil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" @@ -129,126 +126,6 @@ func newSessionTypeConf() map[int32]int32 { } } -type Message struct { - conn grpc.ClientConnInterface - Client msg.MsgClient - discov discovery.SvcDiscoveryRegistry -} - -func NewMessage(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Message { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := msg.NewMsgClient(conn) - return &Message{discov: discov, conn: conn, Client: client} -} - -type MessageRpcClient Message - -func NewMessageRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) MessageRpcClient { - return MessageRpcClient(*NewMessage(discov, rpcRegisterName)) -} - -// SendMsg sends a message through the gRPC client and returns the response. -// It wraps any encountered error for better error handling and context understanding. -func (m *MessageRpcClient) SendMsg(ctx context.Context, req *msg.SendMsgReq) (*msg.SendMsgResp, error) { - resp, err := m.Client.SendMsg(ctx, req) - if err != nil { - return nil, err - } - return resp, nil -} - -// SetUserConversationsMinSeq set min seq -func (m *MessageRpcClient) SetUserConversationsMinSeq(ctx context.Context, req *msg.SetUserConversationsMinSeqReq) (*msg.SetUserConversationsMinSeqResp, error) { - resp, err := m.Client.SetUserConversationsMinSeq(ctx, req) - if err != nil { - return nil, err - } - return resp, nil -} - -// GetMaxSeq retrieves the maximum sequence number from the gRPC client. -// Errors during the gRPC call are wrapped to provide additional context. -func (m *MessageRpcClient) GetMaxSeq(ctx context.Context, req *sdkws.GetMaxSeqReq) (*sdkws.GetMaxSeqResp, error) { - resp, err := m.Client.GetMaxSeq(ctx, req) - if err != nil { - return nil, err - } - return resp, nil -} - -func (m *MessageRpcClient) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { - log.ZDebug(ctx, "GetMaxSeqs", "conversationIDs", conversationIDs) - resp, err := m.Client.GetMaxSeqs(ctx, &msg.GetMaxSeqsReq{ - ConversationIDs: conversationIDs, - }) - if err != nil { - return nil, err - } - return resp.MaxSeqs, err -} - -func (m *MessageRpcClient) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { - resp, err := m.Client.GetHasReadSeqs(ctx, &msg.GetHasReadSeqsReq{ - UserID: userID, - ConversationIDs: conversationIDs, - }) - if err != nil { - return nil, err - } - return resp.MaxSeqs, err -} - -func (m *MessageRpcClient) GetMsgByConversationIDs(ctx context.Context, docIDs []string, seqs map[string]int64) (map[string]*sdkws.MsgData, error) { - resp, err := m.Client.GetMsgByConversationIDs(ctx, &msg.GetMsgByConversationIDsReq{ - ConversationIDs: docIDs, - MaxSeqs: seqs, - }) - if err != nil { - return nil, err - } - return resp.MsgDatas, err -} - -// PullMessageBySeqList retrieves messages by their sequence numbers using the gRPC client. -// It directly forwards the request to the gRPC client and returns the response along with any error encountered. -func (m *MessageRpcClient) PullMessageBySeqList(ctx context.Context, req *sdkws.PullMessageBySeqsReq) (*sdkws.PullMessageBySeqsResp, error) { - resp, err := m.Client.PullMessageBySeqs(ctx, req) - if err != nil { - // Wrap the error to provide more context if the gRPC call fails. - return nil, err - } - return resp, nil -} - -func (m *MessageRpcClient) GetConversationsHasReadAndMaxSeq(ctx context.Context, req *msg.GetConversationsHasReadAndMaxSeqReq) (*msg.GetConversationsHasReadAndMaxSeqResp, error) { - resp, err := m.Client.GetConversationsHasReadAndMaxSeq(ctx, req) - if err != nil { - // Wrap the error to provide more context if the gRPC call fails. - return nil, err - } - return resp, nil -} - -func (m *MessageRpcClient) GetSeqMessage(ctx context.Context, req *msg.GetSeqMessageReq) (*msg.GetSeqMessageResp, error) { - return m.Client.GetSeqMessage(ctx, req) -} - -func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversationID string) (int64, error) { - resp, err := m.Client.GetConversationMaxSeq(ctx, &msg.GetConversationMaxSeqReq{ConversationID: conversationID}) - if err != nil { - return 0, err - } - return resp.MaxSeq, nil -} - -func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error { - _, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts}) - return err -} - type NotificationSender struct { contentTypeConf map[int32]config.NotificationConfig sessionTypeConf map[int32]int32 diff --git a/pkg/notification/push.go b/pkg/notification/push.go deleted file mode 100644 index c549e454a7..0000000000 --- a/pkg/notification/push.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - - "github.com/openimsdk/protocol/push" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -type Push struct { - conn grpc.ClientConnInterface - Client push.PushMsgServiceClient - discov discovery.SvcDiscoveryRegistry -} - -func NewPush(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) *Push { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - return &Push{ - discov: discov, - conn: conn, - Client: push.NewPushMsgServiceClient(conn), - } -} - -type PushRpcClient Push - -func NewPushRpcClient(discov discovery.SvcDiscoveryRegistry, rpcRegisterName string) PushRpcClient { - return PushRpcClient(*NewPush(discov, rpcRegisterName)) -} - -func (p *PushRpcClient) DelUserPushToken(ctx context.Context, req *push.DelUserPushTokenReq) (*push.DelUserPushTokenResp, error) { - return p.Client.DelUserPushToken(ctx, req) -} diff --git a/pkg/notification/third.go b/pkg/notification/third.go deleted file mode 100644 index 7cdc60d52f..0000000000 --- a/pkg/notification/third.go +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - - "github.com/openimsdk/protocol/third" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "google.golang.org/grpc" -) - -type Third struct { - conn grpc.ClientConnInterface - Client third.ThirdClient - discov discovery.SvcDiscoveryRegistry - GrafanaUrl string -} - -func NewThird(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, grafanaUrl string) *Third { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := third.NewThirdClient(conn) - if err != nil { - program.ExitWithError(err) - } - return &Third{discov: discov, Client: client, conn: conn, GrafanaUrl: grafanaUrl} -} -func (t *Third) DeleteOutdatedData(ctx context.Context, expires int64) error { - _, err := t.Client.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expires}) - return err -} diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index 2b247db4e0..8ed6c1ae9e 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" ) diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index e0ee7a9811..174ba7dc5c 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -23,7 +23,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/localcache" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index e2e8d09cd7..8f53234775 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -15,7 +15,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/localcache/lru" - "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/open-im-server/v3/pkg/util/useronline" "github.com/openimsdk/tools/db/cacheutil" "github.com/openimsdk/tools/log" diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go deleted file mode 100644 index bdc1a2e012..0000000000 --- a/pkg/rpcclient/user.go +++ /dev/null @@ -1,231 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package rpcclient - -import ( - "context" - "strings" - - "github.com/openimsdk/open-im-server/v3/pkg/authverify" - "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/protocol/sdkws" - "github.com/openimsdk/protocol/user" - "github.com/openimsdk/tools/discovery" - "github.com/openimsdk/tools/system/program" - "github.com/openimsdk/tools/utils/datautil" - "google.golang.org/grpc" -) - -// User represents a structure holding connection details for the User RPC client. -type User struct { - conn grpc.ClientConnInterface - Client user.UserClient - Discov discovery.SvcDiscoveryRegistry - MessageGateWayRpcName string - imAdminUserID []string -} - -// NewUser initializes and returns a User instance based on the provided service discovery registry. -func NewUser(discov discovery.SvcDiscoveryRegistry, rpcRegisterName, messageGateWayRpcName string, - imAdminUserID []string) *User { - conn, err := discov.GetConn(context.Background(), rpcRegisterName) - if err != nil { - program.ExitWithError(err) - } - client := user.NewUserClient(conn) - return &User{Discov: discov, Client: client, - conn: conn, - MessageGateWayRpcName: messageGateWayRpcName, - imAdminUserID: imAdminUserID} -} - -// UserRpcClient represents the structure for a User RPC client. -type UserRpcClient User - -// NewUserRpcClientByUser initializes a UserRpcClient based on the provided User instance. -func NewUserRpcClientByUser(user *User) *UserRpcClient { - rpc := UserRpcClient(*user) - return &rpc -} - -// NewUserRpcClient initializes a UserRpcClient based on the provided service discovery registry. -func NewUserRpcClient(client discovery.SvcDiscoveryRegistry, rpcRegisterName string, - imAdminUserID []string) UserRpcClient { - return UserRpcClient(*NewUser(client, rpcRegisterName, "", imAdminUserID)) -} - -// GetUsersInfo retrieves information for multiple users based on their user IDs. -func (u *UserRpcClient) GetUsersInfo(ctx context.Context, userIDs []string) ([]*sdkws.UserInfo, error) { - if len(userIDs) == 0 { - return []*sdkws.UserInfo{}, nil - } - resp, err := u.Client.GetDesignateUsers(ctx, &user.GetDesignateUsersReq{ - UserIDs: userIDs, - }) - if err != nil { - return nil, err - } - if ids := datautil.Single(userIDs, datautil.Slice(resp.UsersInfo, func(e *sdkws.UserInfo) string { - return e.UserID - })); len(ids) > 0 { - return nil, servererrs.ErrUserIDNotFound.WrapMsg(strings.Join(ids, ",")) - } - return resp.UsersInfo, nil -} - -// GetUserInfo retrieves information for a single user based on the provided user ID. -func (u *UserRpcClient) GetUserInfo(ctx context.Context, userID string) (*sdkws.UserInfo, error) { - users, err := u.GetUsersInfo(ctx, []string{userID}) - if err != nil { - return nil, err - } - return users[0], nil -} - -// GetUsersInfoMap retrieves a map of user information indexed by their user IDs. -func (u *UserRpcClient) GetUsersInfoMap(ctx context.Context, userIDs []string) (map[string]*sdkws.UserInfo, error) { - users, err := u.GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - return datautil.SliceToMap(users, func(e *sdkws.UserInfo) string { - return e.UserID - }), nil -} - -// GetPublicUserInfos retrieves public information for multiple users based on their user IDs. -func (u *UserRpcClient) GetPublicUserInfos( - ctx context.Context, - userIDs []string, -) ([]*sdkws.PublicUserInfo, error) { - users, err := u.GetUsersInfo(ctx, userIDs) - if err != nil { - return nil, err - } - - return datautil.Slice(users, func(e *sdkws.UserInfo) *sdkws.PublicUserInfo { - return &sdkws.PublicUserInfo{ - UserID: e.UserID, - Nickname: e.Nickname, - FaceURL: e.FaceURL, - Ex: e.Ex, - } - }), nil -} - -// GetPublicUserInfo retrieves public information for a single user based on the provided user ID. -func (u *UserRpcClient) GetPublicUserInfo(ctx context.Context, userID string) (*sdkws.PublicUserInfo, error) { - users, err := u.GetPublicUserInfos(ctx, []string{userID}) - if err != nil { - return nil, err - } - - return users[0], nil -} - -// GetPublicUserInfoMap retrieves a map of public user information indexed by their user IDs. -func (u *UserRpcClient) GetPublicUserInfoMap( - ctx context.Context, - userIDs []string, -) (map[string]*sdkws.PublicUserInfo, error) { - users, err := u.GetPublicUserInfos(ctx, userIDs) - if err != nil { - return nil, err - } - - return datautil.SliceToMap(users, func(e *sdkws.PublicUserInfo) string { - return e.UserID - }), nil -} - -// GetUserGlobalMsgRecvOpt retrieves the global message receive option for a user based on the provided user ID. -func (u *UserRpcClient) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (int32, error) { - resp, err := u.Client.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{ - UserID: userID, - }) - if err != nil { - return 0, err - } - return resp.GlobalRecvMsgOpt, nil -} - -// Access verifies the access rights for the provided user ID. -func (u *UserRpcClient) Access(ctx context.Context, ownerUserID string) error { - _, err := u.GetUserInfo(ctx, ownerUserID) - if err != nil { - return err - } - return authverify.CheckAccessV3(ctx, ownerUserID, u.imAdminUserID) -} - -// GetAllUserID retrieves all user IDs with pagination options. -func (u *UserRpcClient) GetAllUserID(ctx context.Context, pageNumber, showNumber int32) (*user.GetAllUserIDResp, error) { - resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) - if err != nil { - return nil, err - } - return resp, nil -} - -// GetAllUserIDs retrieves all user IDs with pagination options. -func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumber int32) ([]string, error) { - resp, err := u.Client.GetAllUserID(ctx, &user.GetAllUserIDReq{Pagination: &sdkws.RequestPagination{PageNumber: pageNumber, ShowNumber: showNumber}}) - if err != nil { - return nil, err - } - return resp.UserIDs, nil -} - -// SetUserStatus sets the status for a user based on the provided user ID, status, and platform ID. -func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status int32, platformID int) error { - _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{ - UserID: userID, - Status: status, PlatformID: int32(platformID), - }) - return err -} - -func (u *UserRpcClient) GetNotificationByID(ctx context.Context, userID string) error { - _, err := u.Client.GetNotificationAccount(ctx, &user.GetNotificationAccountReq{ - UserID: userID, - }) - return err -} - -func (u *UserRpcClient) GetUsersOnlinePlatform(ctx context.Context, userIDs []string) ([]*user.OnlineStatus, error) { - if len(userIDs) == 0 { - return nil, nil - } - resp, err := u.Client.GetUserStatus(ctx, &user.GetUserStatusReq{UserIDs: userIDs, UserID: u.imAdminUserID[0]}) - if err != nil { - return nil, err - } - return resp.StatusList, nil -} - -func (u *UserRpcClient) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { - resp, err := u.GetUsersOnlinePlatform(ctx, []string{userID}) - if err != nil { - return nil, err - } - if len(resp) == 0 { - return nil, nil - } - return resp[0].PlatformIDs, nil -} - -func (u *UserRpcClient) GetAllOnlineUsers(ctx context.Context, cursor uint64) (*user.GetAllOnlineUsersResp, error) { - return u.Client.GetAllOnlineUsers(ctx, &user.GetAllOnlineUsersReq{Cursor: cursor}) -}