diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 82002c26b9..d8836d54eb 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont for _, msg := range msgFromMQ.MsgData { seqs = append(seqs, msg.Seq) } - err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) - if err != nil { - log.ZError( - ctx, - "remove cache msg from redis err", - err, - "msg", - msgFromMQ.MsgData, - "conversationID", - msgFromMQ.ConversationID, - ) - } } -func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } + func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim( - sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim, -) error { // an instance in the consumer group +func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) for msg := range claim.Messages() { diff --git a/pkg/common/storage/cache/msg.go b/pkg/common/storage/cache/msg.go index 9a04f3cf82..f780cd5377 100644 --- a/pkg/common/storage/cache/msg.go +++ b/pkg/common/storage/cache/msg.go @@ -25,4 +25,5 @@ type MsgCache interface { GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error + SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error } diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index 16eacc126b..b6799d5dcc 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -2,6 +2,7 @@ package redis import ( "context" + "encoding/json" "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" @@ -77,3 +78,19 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, } return nil } + +func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error { + for _, msg := range msgs { + if msg == nil || msg.Seq <= 0 { + continue + } + data, err := json.Marshal(msg) + if err != nil { + return err + } + if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Seq), string(data), msgCacheTimeout); err != nil { + return err + } + } + return nil +} diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 58b3a24374..edb98c7f98 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -2,7 +2,9 @@ package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" @@ -252,6 +254,9 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil { return 0, false, nil, err } + if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, convert.MsgPb2DB)); err != nil { + return 0, false, nil, err + } return lastMaxSeq, isNew, userSeqMap, nil }