Skip to content

Commit

Permalink
redis msg cache
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 26, 2024
1 parent 2d9a945 commit ca8d551
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 17 deletions.
20 changes: 3 additions & 17 deletions internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(
ctx,
"remove cache msg from redis err",
err,
"msg",
msgFromMQ.MsgData,
"conversationID",
msgFromMQ.ConversationID,
)
}
}

func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // an instance in the consumer group
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
for msg := range claim.Messages() {
Expand Down
1 change: 1 addition & 0 deletions pkg/common/storage/cache/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ type MsgCache interface {

GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error
}
17 changes: 17 additions & 0 deletions pkg/common/storage/cache/redis/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package redis

import (
"context"
"encoding/json"
"github.com/dtm-labs/rockscache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
Expand Down Expand Up @@ -77,3 +78,19 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
}
return nil
}

func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgDataModel) error {
for _, msg := range msgs {
if msg == nil || msg.Seq <= 0 {
continue
}
data, err := json.Marshal(msg)
if err != nil {
return err
}
if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Seq), string(data), msgCacheTimeout); err != nil {
return err
}
}
return nil
}
5 changes: 5 additions & 0 deletions pkg/common/storage/controller/msg_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package controller

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/utils/datautil"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
Expand Down Expand Up @@ -252,6 +254,9 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver
if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil {
return 0, false, nil, err
}
if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, convert.MsgPb2DB)); err != nil {
return 0, false, nil, err
}
return lastMaxSeq, isNew, userSeqMap, nil
}

Expand Down

0 comments on commit ca8d551

Please sign in to comment.