Skip to content

Commit

Permalink
redis msg cache
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 26, 2024
1 parent 4d1655e commit 9e64d8a
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 332 deletions.
2 changes: 1 addition & 1 deletion internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ func Start(ctx context.Context, index int, config *Config) error {
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := redis.NewMsgCache(rdb)
msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb)
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
Expand Down
11 changes: 3 additions & 8 deletions pkg/common/storage/cache/cachekey/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,12 @@ import (
)

const (
messageCache = "MESSAGE_CACHE:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
messageCacheV2 = "MESSAGE_CACHE_V2:"
messageCache = "MSG_CACHE:"
)

func GetMessageCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
}

func GetMessageCacheKeyV2(conversationID string, seq int64) string {
return messageCacheV2 + conversationID + "_" + strconv.Itoa(int(seq))
func GetMsgCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
}

func GetSendMsgKey(id string) string {
Expand Down
4 changes: 0 additions & 4 deletions pkg/common/storage/cache/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@ package cache
import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/sdkws"
)

type MsgCache interface {
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)

Expand Down
92 changes: 8 additions & 84 deletions pkg/common/storage/cache/redis/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/open-im-server/v3/pkg/msgprocessor"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"github.com/redis/go-redis/v9"
Expand All @@ -18,61 +16,24 @@ import (
// msgCacheTimeout is expiration time of message cache, 86400 seconds
const msgCacheTimeout = time.Hour * 24

func NewMsgCache(client redis.UniversalClient) cache.MsgCache {
return &msgCache{rdb: client}
func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache {
return &msgCache{
rdb: client,
rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()),
msgDocDatabase: db,
}
}

type msgCache struct {
rdb redis.UniversalClient
rcClient *rockscache.Client
msgDocDatabase database.Msg
msgTable model.MsgDocModel
}

func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string {
return cachekey.GetMessageCacheKey(conversationID, seq)
}

func (c *msgCache) getSendMsgKey(id string) string {
return cachekey.GetSendMsgKey(id)
}

func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) {
msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string {
return c.getMessageCacheKey(conversationID, msg.Seq)
})
err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
var values []string
for _, key := range keys {
if msg, ok := msgMap[key]; ok {
s, err := msgprocessor.Pb2String(msg)
if err != nil {
return err
}
values = append(values, s)
}
}
return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, int(msgCacheTimeout/time.Second))
})
if err != nil {
return 0, err
}
return len(msgs), nil
}

func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error {
var keys []string
for _, seq := range seqs {
keys = append(keys, c.getMessageCacheKey(conversationID, seq))
}
return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
return LuaDeleteBatch(ctx, c.rdb, keys)
})
}

func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error {
return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err())
}
Expand All @@ -82,49 +43,12 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro
return int32(result), errs.Wrap(err)
}

func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) {
var keys []string
keySeqMap := make(map[string]int64, 10)
for _, seq := range seqs {
key := c.getMessageCacheKey(conversationID, seq)
keys = append(keys, key)
keySeqMap[key] = seq
}
err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error {
result, err := LuaGetBatch(ctx, c.rdb, keys)
if err != nil {
return err
}
for i, value := range result {
seq := keySeqMap[keys[i]]
if value == nil {
failedSeqs = append(failedSeqs, seq)
continue
}

msg := &sdkws.MsgData{}
msgString, ok := value.(string)
if !ok || msgprocessor.String2Pb(msgString, msg) != nil {
failedSeqs = append(failedSeqs, seq)
continue
}
seqMsgs = append(seqMsgs, msg)

}
return nil
})
if err != nil {
return nil, nil, err
}
return seqMsgs, failedSeqs, nil
}

func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) {
if len(seqs) == 0 {
return nil, nil
}
getKey := func(seq int64) string {
return cachekey.GetMessageCacheKeyV2(conversationID, seq)
return cachekey.GetMsgCacheKey(conversationID, seq)
}
getMsgID := func(msg *model.MsgInfoModel) int64 {
return msg.Msg.Seq
Expand All @@ -140,7 +64,7 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string,
return nil
}
keys := datautil.Slice(seqs, func(seq int64) string {
return cachekey.GetMessageCacheKeyV2(conversationID, seq)
return cachekey.GetMsgCacheKey(conversationID, seq)
})
slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys)
if err != nil {
Expand Down
133 changes: 0 additions & 133 deletions pkg/common/storage/cache/redis/msg_test.go

This file was deleted.

Loading

0 comments on commit 9e64d8a

Please sign in to comment.