From 1219b47c79246b79553580c243f758d7b5db7cb6 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 26 Dec 2024 17:24:36 +0800 Subject: [PATCH] redis msg cache --- internal/rpc/msg/revoke.go | 5 +++-- pkg/common/storage/cache/msg.go | 2 +- pkg/common/storage/cache/redis/msg.go | 6 +++--- pkg/common/storage/controller/msg.go | 3 +++ pkg/common/storage/controller/msg_transfer.go | 7 ++++++- pkg/common/storage/database/msg.go | 2 +- 6 files changed, 17 insertions(+), 8 deletions(-) diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index b7cc7df62e..97de0f48a7 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data)) var role int32 if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) { - switch msgs[0].SessionType { + sessionType := msgs[0].SessionType + switch sessionType { case constant.SingleChatType: if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil { return nil, err @@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. role = member.RoleLevel } default: - return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported") + return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType) } } now := time.Now().UnixMilli() diff --git a/pkg/common/storage/cache/msg.go b/pkg/common/storage/cache/msg.go index f780cd5377..271ed19fe3 100644 --- a/pkg/common/storage/cache/msg.go +++ b/pkg/common/storage/cache/msg.go @@ -25,5 +25,5 @@ type MsgCache interface { GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error - SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error + SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error } diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index b6799d5dcc..0651f02830 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -79,16 +79,16 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, return nil } -func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error { +func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error { for _, msg := range msgs { - if msg == nil || msg.Seq <= 0 { + if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 { continue } data, err := json.Marshal(msg) if err != nil { return err } - if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Seq), string(data), msgCacheTimeout); err != nil { + if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil { return err } } diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 8c97aa6cee..d5ad12584c 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -260,6 +260,9 @@ func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversat } func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { + if msg == nil || msg.Msg == nil { + return + } if msg.IsRead { msg.Msg.IsRead = true } diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index f0e4f43328..f4c0c6270f 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -252,7 +252,12 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver userSeqMap[m.SendID] = m.Seq seqs = append(seqs, m.Seq) } - if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, convert.MsgPb2DB)); err != nil { + msgToDB := func(msg *sdkws.MsgData) *model.MsgInfoModel { + return &model.MsgInfoModel{ + Msg: convert.MsgPb2DB(msg), + } + } + if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, msgToDB)); err != nil { return 0, false, nil, err } return lastMaxSeq, isNew, userSeqMap, nil diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index d964b18269..c92f182a37 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -24,7 +24,7 @@ import ( ) type Msg interface { - PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error + //PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error Create(ctx context.Context, model *model.MsgDocModel) error UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error)