Skip to content

Commit

Permalink
rpc client
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 20, 2024
1 parent e989506 commit 9625cc5
Show file tree
Hide file tree
Showing 22 changed files with 335 additions and 214 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,8 @@ require (
golang.org/x/crypto v0.27.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)


replace (
github.com/openimsdk/protocol => /Users/chao/Desktop/code/protocol
)
57 changes: 19 additions & 38 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package conversation

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"sort"
"time"

Expand All @@ -26,9 +27,6 @@ import (
dbModel "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/localcache"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
pbgroup "github.com/openimsdk/protocol/group"
pbmsg "github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/tools/db/redisutil"

"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
Expand All @@ -52,6 +50,9 @@ type conversationServer struct {

conversationNotificationSender *ConversationNotificationSender
config *Config
// todo
msgClient *rpcli.MsgClient
groupClient *rpcli.GroupClient
}

type Config struct {
Expand Down Expand Up @@ -118,19 +119,12 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
if len(conversations) == 0 {
return nil, errs.ErrRecordNotFound.Wrap()
}

maxSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetMaxSeqsCaller.Invoke,
&pbmsg.GetMaxSeqsReq{ConversationIDs: conversationIDs},
(*pbmsg.SeqsInfoResp).GetMaxSeqs)
maxSeqs, err := c.msgClient.GetMaxSeqs(ctx, conversationIDs)
if err != nil {
return nil, err
}

chatLogs, err := rpccall.ExtractField(ctx, pbmsg.GetMsgByConversationIDsCaller.Invoke,
&pbmsg.GetMsgByConversationIDsReq{
ConversationIDs: conversationIDs,
MaxSeqs: maxSeqs,
}, (*pbmsg.GetMsgByConversationIDsResp).GetMsgDatas)
chatLogs, err := c.msgClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs)
if err != nil {
return nil, err
}
Expand All @@ -140,9 +134,7 @@ func (c *conversationServer) GetSortedConversationList(ctx context.Context, req
return nil, err
}

hasReadSeqs, err := rpccall.ExtractField(ctx, pbmsg.GetHasReadSeqsCaller.Invoke,
&pbmsg.GetHasReadSeqsReq{ConversationIDs: conversationIDs},
(*pbmsg.SeqsInfoResp).GetMaxSeqs)
hasReadSeqs, err := c.msgClient.GetHasReadSeqs(ctx, conversationIDs, req.UserID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -230,14 +222,7 @@ func (c *conversationServer) SetConversations(ctx context.Context, req *pbconver
return nil, errs.ErrArgs.WrapMsg("conversation must not be nil")
}
if req.Conversation.ConversationType == constant.WriteGroupChatType {
groupInfo, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke,
&pbgroup.GetGroupsInfoReq{GroupIDs: []string{req.Conversation.GroupID}},
func(r *pbgroup.GetGroupsInfoResp) *sdkws.GroupInfo {
if len(r.GroupInfos) > 0 {
return r.GroupInfos[0]
}
return nil
})
groupInfo, err := c.groupClient.GetGroupInfo(ctx, req.Conversation.GroupID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -444,14 +429,14 @@ func (c *conversationServer) CreateGroupChatConversations(ctx context.Context, r
return nil, err
}
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, req.GroupID)
if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: conversationID, OwnerUserID: req.UserIDs, MaxSeq: 0}); err != nil {
if err := c.msgClient.SetUserConversationMaxSeq(ctx, conversationID, req.UserIDs, 0); err != nil {
return nil, err
}
return &pbconversation.CreateGroupChatConversationsResp{}, nil
}

func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbconversation.SetConversationMaxSeqReq) (*pbconversation.SetConversationMaxSeqResp, error) {
if _, err := pbmsg.SetUserConversationMaxSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMaxSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MaxSeq: req.MaxSeq}); err != nil {
if err := c.msgClient.SetUserConversationMaxSeq(ctx, req.ConversationID, req.OwnerUserID, req.MaxSeq); err != nil {
return nil, err
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
Expand All @@ -465,7 +450,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
}

func (c *conversationServer) SetConversationMinSeq(ctx context.Context, req *pbconversation.SetConversationMinSeqReq) (*pbconversation.SetConversationMinSeqResp, error) {
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, &pbmsg.SetUserConversationMinSeqReq{ConversationID: req.ConversationID, OwnerUserID: req.OwnerUserID, MinSeq: req.MinSeq}); err != nil {
if err := c.msgClient.SetUserConversationMin(ctx, req.ConversationID, req.OwnerUserID, req.MinSeq); err != nil {
return nil, err
}
if err := c.conversationDatabase.UpdateUsersConversationField(ctx, req.OwnerUserID, req.ConversationID,
Expand Down Expand Up @@ -584,9 +569,7 @@ func (c *conversationServer) getConversationInfo(
}
}
if len(groupIDs) != 0 {
groupInfos, err := rpccall.ExtractField(ctx, pbgroup.GetGroupsInfoCaller.Invoke,
&pbgroup.GetGroupsInfoReq{GroupIDs: groupIDs},
(*pbgroup.GetGroupsInfoResp).GetGroupInfos)
groupInfos, err := c.groupClient.GetGroupsInfo(ctx, groupIDs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -774,23 +757,22 @@ func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
seq, err := c.msgClient.GetLastMessageSeqByTime(ctx, conversation.ConversationID, req.Timestamp-conversation.MsgDestructTime)
if err != nil {
return nil, err
}
if resp.Seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
if seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err
}
continue
}
resp.Seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, seq, latestMsgDestructTime); err != nil {
return nil, err
}
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", seq, "msgDestructTime", conversation.MsgDestructTime)
}
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
}
Expand All @@ -800,8 +782,7 @@ func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx c
"latest_msg_destruct_time": latestMsgDestructTime,
}
if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
if err := c.msgClient.SetUserConversationMin(ctx, conversationID, []string{ownerUserID}, minSeq); err != nil {
return err
}
update["min_seq"] = minSeq
Expand Down
29 changes: 8 additions & 21 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package group
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/rpcli"
"math/big"
"math/rand"
"strconv"
Expand All @@ -41,8 +42,6 @@ import (
"github.com/openimsdk/protocol/constant"
pbconv "github.com/openimsdk/protocol/conversation"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/rpccall"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/db/mongoutil"
Expand All @@ -63,6 +62,9 @@ type groupServer struct {
notification *GroupNotificationSender
config *Config
webhookClient *webhook.Client
// todo
msgClient *rpcli.MsgClient
conversationClient *rpcli.ConversationClient
}

type Config struct {
Expand Down Expand Up @@ -950,18 +952,11 @@ func (g *groupServer) QuitGroup(ctx context.Context, req *pbgroup.QuitGroupReq)

func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, groupID string, userIDs []string) error {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)
maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke,
&msg.GetConversationMaxSeqReq{ConversationID: conversationID},
(*msg.GetConversationMaxSeqResp).GetMaxSeq)
maxSeq, err := g.msgClient.GetConversationMaxSeq(ctx, conversationID)
if err != nil {
return err
}

return pbconv.SetConversationMaxSeqCaller.Execute(ctx, &pbconv.SetConversationMaxSeqReq{
ConversationID: conversationID,
OwnerUserID: userIDs,
MaxSeq: maxSeq,
})
return g.conversationClient.SetConversationMaxSeq(ctx, conversationID, userIDs, maxSeq)
}

func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInfoReq) (*pbgroup.SetGroupInfoResp, error) {
Expand Down Expand Up @@ -1037,11 +1032,7 @@ func (g *groupServer) SetGroupInfo(ctx context.Context, req *pbgroup.SetGroupInf
return
}
conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}

if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: resp.UserIDs,
Conversation: conversation,
}); err != nil {
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
}
}()
Expand Down Expand Up @@ -1154,11 +1145,7 @@ func (g *groupServer) SetGroupInfoEx(ctx context.Context, req *pbgroup.SetGroupI
}

conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.GroupNotification}

if err := pbconv.SetConversationsCaller.Execute(ctx, &pbconv.SetConversationsReq{
UserIDs: resp.UserIDs,
Conversation: conversation,
}); err != nil {
if err := g.conversationClient.SetConversations(ctx, resp.UserIDs, conversation); err != nil {
log.ZWarn(ctx, "SetConversations", err, "UserIDs", resp.UserIDs, "conversation", conversation)
}
}()
Expand Down
4 changes: 4 additions & 0 deletions internal/rpc/group/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,13 @@ func (g *GroupNotificationSender) GroupApplicationAgreeMemberEnterNotification(c

if !g.config.RpcConfig.EnableHistoryForNewMembers {
conversationID := msgprocessor.GetConversationIDBySessionType(constant.ReadGroupChatType, groupID)

maxSeq, err := rpccall.ExtractField(ctx, msg.GetConversationMaxSeqCaller.Invoke,
&msg.GetConversationMaxSeqReq{ConversationID: conversationID},
(*msg.GetConversationMaxSeqResp).GetMaxSeq)
maxSeq,err := g.


if err != nil {
return err
}
Expand Down
78 changes: 2 additions & 76 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,13 @@ package msg

import (
"context"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
"strings"
)

// hard delete in Database.
// DestructMsgs hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
Expand Down Expand Up @@ -61,70 +51,6 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}

// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100

errg, _ := errgroup.WithContext(ctx)
errg.SetLimit(100)

for i := 0; i < len(temp); i += batchNum {
batch := temp[i:min(i+batchNum, len(temp))]

errg.Go(func() error {
for _, conversation := range batch {
handleCtx := mcontext.NewCtx(stringutil.GetSelfFuncName() + "-" + idutil.OperationIDGenerator() + "-" + conversation.ConversationID + "-" + conversation.OwnerUserID)
log.ZDebug(handleCtx, "User MsgsDestruct",
"conversationID", conversation.ConversationID,
"ownerUserID", conversation.OwnerUserID,
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)

seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

if len(seqs) > 0 {
minseq := datautil.Max(seqs...)

// update
if err := pbconv.UpdateConversationCaller.Execute(ctx, &pbconv.UpdateConversationReq{
ConversationID: conversation.ConversationID,
UserIDs: []string{conversation.OwnerUserID},
MinSeq: wrapperspb.Int64(minseq),
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

if err := pbconv.SetConversationMinSeqCaller.Execute(ctx, &pbconv.SetConversationMinSeqReq{
ConversationID: conversation.ConversationID,
OwnerUserID: []string{conversation.OwnerUserID},
MinSeq: minseq,
}); err != nil {
return err
}

// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
}
return nil
})
}

if err := errg.Wait(); err != nil {
return nil, err
}

return nil, nil
}

func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil {
Expand Down
Loading

0 comments on commit 9625cc5

Please sign in to comment.