Skip to content

Commit

Permalink
rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 23, 2024
1 parent 9625cc5 commit a11dd60
Show file tree
Hide file tree
Showing 40 changed files with 707 additions and 540 deletions.
64 changes: 27 additions & 37 deletions internal/api/jssdk/jssdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package jssdk

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort"

"github.com/gin-gonic/gin"
"github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/jssdk"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/relation"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/user"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
)
Expand All @@ -26,6 +25,11 @@ func NewJSSdkApi() *JSSdk {
}

type JSSdk struct {
userClient rpcli.UserClient
relationClient rpcli.RelationClient
groupClient rpcli.GroupClient
conversationClient rpcli.ConversationClient
msgClient rpcli.MsgClient
}

func (x *JSSdk) GetActiveConversations(c *gin.Context) {
Expand Down Expand Up @@ -57,23 +61,23 @@ func (x *JSSdk) fillConversations(ctx context.Context, conversations []*jssdk.Co
groupMap map[string]*sdkws.GroupInfo
)
if len(userIDs) > 0 {
users, err := field(ctx, user.GetDesignateUsersCaller.Invoke, &user.GetDesignateUsersReq{UserIDs: userIDs}, (*user.GetDesignateUsersResp).GetUsersInfo)
users, err := x.userClient.GetUsersInfo(ctx, userIDs)
if err != nil {
return err
}
friends, err := field(ctx, relation.GetFriendInfoCaller.Invoke, &relation.GetFriendInfoReq{OwnerUserID: conversations[0].Conversation.OwnerUserID, FriendUserIDs: userIDs}, (*relation.GetFriendInfoResp).GetFriendInfos)
friends, err := x.relationClient.GetFriendsInfo(ctx, conversations[0].Conversation.OwnerUserID, userIDs)
if err != nil {
return err
}
userMap = datautil.SliceToMap(users, (*sdkws.UserInfo).GetUserID)
friendMap = datautil.SliceToMap(friends, (*relation.FriendInfoOnly).GetFriendUserID)
}
if len(groupIDs) > 0 {
resp, err := group.GetGroupsInfoCaller.Invoke(ctx, &group.GetGroupsInfoReq{GroupIDs: groupIDs})
groups, err := x.groupClient.GetGroupsInfo(ctx, groupIDs)
if err != nil {
return err
}
groupMap = datautil.SliceToMap(resp.GroupInfos, (*sdkws.GroupInfo).GetGroupID)
groupMap = datautil.SliceToMap(groups, (*sdkws.GroupInfo).GetGroupID)
}
for _, c := range conversations {
if c.Conversation.GroupID == "" {
Expand All @@ -91,21 +95,18 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
req.Count = defaultGetActiveConversation
}
req.OwnerUserID = mcontext.GetOpUserID(ctx)
conversationIDs, err := field(ctx, conversation.GetConversationIDsCaller.Invoke,
&conversation.GetConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetConversationIDsResp).GetConversationIDs)
conversationIDs, err := x.conversationClient.GetConversationIDs(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
if len(conversationIDs) == 0 {
return &jssdk.GetActiveConversationsResp{}, nil
}
readSeq, err := field(ctx, msg.GetHasReadSeqsCaller.Invoke,
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: conversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
readSeq, err := x.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
activeConversation, err := field(ctx, msg.GetActiveConversationCaller.Invoke,
&msg.GetActiveConversationReq{ConversationIDs: conversationIDs}, (*msg.GetActiveConversationResp).GetConversations)
activeConversation, err := x.msgClient.GetActiveConversation(ctx, conversationIDs)
if err != nil {
return nil, err
}
Expand All @@ -116,34 +117,26 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive
Conversation: activeConversation,
}
if len(activeConversation) > 1 {
pinnedConversationIDs, err := field(ctx, conversation.GetPinnedConversationIDsCaller.Invoke,
&conversation.GetPinnedConversationIDsReq{UserID: req.OwnerUserID}, (*conversation.GetPinnedConversationIDsResp).GetConversationIDs)
pinnedConversationIDs, err := x.conversationClient.GetPinnedConversationIDs(ctx, req.OwnerUserID)
if err != nil {
return nil, err
}
sortConversations.PinnedConversationIDs = datautil.SliceSet(pinnedConversationIDs)
}
sort.Sort(&sortConversations)
sortList := sortConversations.Top(int(req.Count))
conversations, err := field(ctx, conversation.GetConversationsCaller.Invoke,
&conversation.GetConversationsReq{
OwnerUserID: req.OwnerUserID,
ConversationIDs: datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
return c.ConversationID
})}, (*conversation.GetConversationsResp).GetConversations)
conversations, err := x.conversationClient.GetConversations(ctx, datautil.Slice(sortList, func(c *msg.ActiveConversation) string {
return c.ConversationID
}), req.OwnerUserID)
if err != nil {
return nil, err
}
msgs, err := field(ctx, msg.GetSeqMessageCaller.Invoke,
&msg.GetSeqMessageReq{
UserID: req.OwnerUserID,
Conversations: datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
return &msg.ConversationSeqs{
ConversationID: c.ConversationID,
Seqs: []int64{c.MaxSeq},
}
}),
}, (*msg.GetSeqMessageResp).GetMsgs)
msgs, err := x.msgClient.GetSeqMessage(ctx, req.OwnerUserID, datautil.Slice(sortList, func(c *msg.ActiveConversation) *msg.ConversationSeqs {
return &msg.ConversationSeqs{
ConversationID: c.ConversationID,
Seqs: []int64{c.MaxSeq},
}
}))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,7 +178,7 @@ func (x *JSSdk) getActiveConversations(ctx context.Context, req *jssdk.GetActive

func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversationsReq) (*jssdk.GetConversationsResp, error) {
req.OwnerUserID = mcontext.GetOpUserID(ctx)
conversations, err := field(ctx, conversation.GetConversationsCaller.Invoke, &conversation.GetConversationsReq{OwnerUserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*conversation.GetConversationsResp).GetConversations)
conversations, err := x.conversationClient.GetConversations(ctx, req.ConversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
Expand All @@ -195,13 +188,11 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
req.ConversationIDs = datautil.Slice(conversations, func(c *conversation.Conversation) string {
return c.ConversationID
})
maxSeqs, err := field(ctx, msg.GetMaxSeqsCaller.Invoke,
&msg.GetMaxSeqsReq{ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
maxSeqs, err := x.msgClient.GetMaxSeqs(ctx, req.ConversationIDs)
if err != nil {
return nil, err
}
readSeqs, err := field(ctx, msg.GetHasReadSeqsCaller.Invoke,
&msg.GetHasReadSeqsReq{UserID: req.OwnerUserID, ConversationIDs: req.ConversationIDs}, (*msg.SeqsInfoResp).GetMaxSeqs)
readSeqs, err := x.msgClient.GetHasReadSeqs(ctx, req.ConversationIDs, req.OwnerUserID)
if err != nil {
return nil, err
}
Expand All @@ -216,8 +207,7 @@ func (x *JSSdk) getConversations(ctx context.Context, req *jssdk.GetConversation
}
var msgs map[string]*sdkws.PullMsgs
if len(conversationSeqs) > 0 {
msgs, err = field(ctx, msg.GetSeqMessageCaller.Invoke,
&msg.GetSeqMessageReq{UserID: req.OwnerUserID, Conversations: conversationSeqs}, (*msg.GetSeqMessageResp).GetMsgs)
msgs, err = x.msgClient.GetSeqMessage(ctx, req.OwnerUserID, conversationSeqs)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ func Start(ctx context.Context, index int, conf *Config) error {
)

hubServer := NewServer(longServer, conf, func(srv *Server) error {
longServer.online, _ = rpccache.NewOnlineCache(conf.Share.IMAdminUserID, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
return nil
var err error
longServer.online, err = rpccache.NewOnlineCache(conf.Share.IMAdminUserID, nil, rdb, false, longServer.subscriberUserOnlineStatusChanges)
return err
})

go longServer.ChangeOnlineStatus(4)
Expand Down
7 changes: 1 addition & 6 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"strconv"
"syscall"

"github.com/openimsdk/open-im-server/v3/pkg/rpcclient"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/utils/jsonutil"
Expand Down Expand Up @@ -93,10 +92,6 @@ func Start(ctx context.Context, index int, config *Config) error {
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
if err = rpcclient.InitRpcCaller(client, config.Discovery.RpcService); err != nil {
return err
}

msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
Expand All @@ -116,7 +111,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
historyCH, err := NewOnlineHistoryRedisConsumerHandler(&config.KafkaConfig, msgTransferDatabase)
historyCH, err := NewOnlineHistoryRedisConsumerHandler(ctx, client, config, msgTransferDatabase)
if err != nil {
return err
}
Expand Down
42 changes: 24 additions & 18 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,21 @@ import (
"context"
"encoding/json"
"errors"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"github.com/openimsdk/tools/discovery"
"strconv"
"strings"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/go-redis/redis"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/open-im-server/v3/pkg/tools/batcher"
"github.com/openimsdk/protocol/constant"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -72,16 +71,30 @@ type OnlineHistoryRedisConsumerHandler struct {
msgTransferDatabase controller.MsgTransferDatabase
conversationUserHasReadChan chan *userHasReadSeq
wg sync.WaitGroup

groupClient *rpcli.GroupClient
conversationClient *rpcli.ConversationClient
}

func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
func NewOnlineHistoryRedisConsumerHandler(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config, database controller.MsgTransferDatabase) (*OnlineHistoryRedisConsumerHandler, error) {
kafkaConf := config.KafkaConfig
historyConsumerGroup, err := kafka.NewMConsumerGroup(kafkaConf.Build(), kafkaConf.ToRedisGroupID, []string{kafkaConf.ToRedisTopic}, false)
if err != nil {
return nil, err
}
groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group)
if err != nil {
return nil, err
}
conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation)
if err != nil {
return nil, err
}
var och OnlineHistoryRedisConsumerHandler
och.msgTransferDatabase = database
och.conversationUserHasReadChan = make(chan *userHasReadSeq, hasReadChanBuffer)
och.groupClient = rpcli.NewGroupClient(groupConn)
och.conversationClient = rpcli.NewConversationClient(conversationConn)
och.wg.Add(1)

b := batcher.New[sarama.ConsumerMessage](
Expand Down Expand Up @@ -109,15 +122,13 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = mcontext.WithTriggerIDContext(ctx, val.TriggerID())
ctxMessages := och.parseConsumerMessages(ctx, val.Val())
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)

storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
log.ZDebug(ctx, "number of categorized messages", "storageMsgList", len(storageMsgList), "notStorageMsgList",
len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList",
len(notStorageNotificationList))
len(notStorageMsgList), "storageNotificationList", len(storageNotificationList), "notStorageNotificationList", len(notStorageNotificationList))

conversationIDMsg := msgprocessor.GetChatConversationIDByMsg(ctxMessages[0].message)
conversationIDNotification := msgprocessor.GetNotificationConversationIDByMsg(ctxMessages[0].message)
Expand Down Expand Up @@ -282,31 +293,26 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(ctx context.Context, key
log.ZDebug(ctx, "group chat first create conversation", "conversationID",
conversationID)

userIDs, err := rpccall.ExtractField(ctx, group.GetGroupMemberUserIDsCaller.Invoke,
&group.GetGroupMemberUserIDsReq{
GroupID: msg.GroupID,
}, (*group.GetGroupMemberUserIDsResp).GetUserIDs)
userIDs, err := och.groupClient.GetGroupMemberUserIDs(ctx, msg.GroupID)
if err != nil {
log.ZWarn(ctx, "get group member ids error", err, "conversationID",
conversationID)
} else {
log.ZInfo(ctx, "GetGroupMemberIDs end")

if err := pbconv.CreateGroupChatConversationsCaller.Execute(ctx, &pbconv.CreateGroupChatConversationsReq{
UserIDs: userIDs,
GroupID: msg.GroupID,
}); err != nil {
if err := och.conversationClient.CreateGroupChatConversations(ctx, msg.GroupID, userIDs); err != nil {
log.ZWarn(ctx, "single chat first create conversation error", err,
"conversationID", conversationID)
}
}
case constant.SingleChatType, constant.NotificationChatType:
if err := pbconv.CreateSingleChatConversationsCaller.Execute(ctx, &pbconv.CreateSingleChatConversationsReq{
req := &pbconv.CreateSingleChatConversationsReq{
RecvID: msg.RecvID,
SendID: msg.SendID,
ConversationID: conversationID,
ConversationType: msg.SessionType,
}); err != nil {
}
if err := och.conversationClient.CreateSingleChatConversations(ctx, req); err != nil {
log.ZWarn(ctx, "single chat or notification first create conversation error", err,
"conversationID", conversationID, "sessionType", msg.SessionType)
}
Expand Down
7 changes: 1 addition & 6 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ type Config struct {
runTimeEnv string
}

func (p pushServer) PushMsg(ctx context.Context, req *pbpush.PushMsgReq) (*pbpush.PushMsgResp, error) {
//todo reserved Interface
return nil, nil
}

func (p pushServer) DelUserPushToken(ctx context.Context,
req *pbpush.DelUserPushTokenReq) (resp *pbpush.DelUserPushTokenResp, err error) {
if err = p.database.DelFcmToken(ctx, req.UserID, int(req.PlatformID)); err != nil {
Expand All @@ -65,7 +60,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg

database := controller.NewPushDatabase(cacheModel, &config.KafkaConfig)

consumer, err := NewConsumerHandler(config, database, offlinePusher, rdb, client)
consumer, err := NewConsumerHandler(ctx, config, database, offlinePusher, rdb, client)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit a11dd60

Please sign in to comment.