Skip to content

Commit

Permalink
feat: support message cache (openimsdk#3007)
Browse files Browse the repository at this point in the history
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* upgrading pkg tools

* redis msg cache

* redis msg cache

* redis msg cache

* redis msg cache

* redis msg cache

* redis msg cache

* redis msg cache
  • Loading branch information
withchao authored and OpenIM-Robot committed Dec 27, 2024
1 parent 52052a9 commit f9e6d07
Show file tree
Hide file tree
Showing 14 changed files with 292 additions and 1,067 deletions.
17 changes: 16 additions & 1 deletion internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,26 @@ func Start(ctx context.Context, index int, config *Config) error {
}
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := redis.NewMsgCache(rdb)

if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{
config.MsgTransfer.GetConfigFileName(),
config.RedisConfig.GetConfigFileName(),
config.MongodbConfig.GetConfigFileName(),
config.KafkaConfig.GetConfigFileName(),
config.Share.GetConfigFileName(),
config.WebhooksConfig.GetConfigFileName(),
config.Discovery.GetConfigFileName(),
conf.LogConfigFileName,
})
cm.Watch(ctx)
}

msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB())
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
Expand Down
20 changes: 3 additions & 17 deletions internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont
for _, msg := range msgFromMQ.MsgData {
seqs = append(seqs, msg.Seq)
}
err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs)
if err != nil {
log.ZError(
ctx,
"remove cache msg from redis err",
err,
"msg",
msgFromMQ.MsgData,
"conversationID",
msgFromMQ.ConversationID,
)
}
}

func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }

func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(
sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim,
) error { // an instance in the consumer group
func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group
log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset",
claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition())
for msg := range claim.Messages() {
Expand Down
6 changes: 2 additions & 4 deletions internal/rpc/msg/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,8 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy
return nil, err
}
remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp
for _, conversationID := range req.ConversationIDs {
if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, remainTime); err != nil {
log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err)
}
if _, err := m.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: remainTime, Limit: 9999}); err != nil {
return nil, err
}
return &msg.DeleteMsgPhysicalResp{}, nil
}
Expand Down
5 changes: 3 additions & 2 deletions internal/rpc/msg/revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data))
var role int32
if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) {
switch msgs[0].SessionType {
sessionType := msgs[0].SessionType
switch sessionType {
case constant.SingleChatType:
if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil {
return nil, err
Expand All @@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg.
role = member.RoleLevel
}
default:
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported")
return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType)
}
}
now := time.Now().UnixMilli()
Expand Down
6 changes: 1 addition & 5 deletions internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
if err != nil {
return err
}
msgModel := redis.NewMsgCache(rdb)
conversationClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
userRpcClient := rpcclient.NewUserRpcClient(client, config.Share.RpcRegisterName.User, config.Share.IMAdminUserID)
groupRpcClient := rpcclient.NewGroupRpcClient(client, config.Share.RpcRegisterName.Group)
friendRpcClient := rpcclient.NewFriendRpcClient(client, config.Share.RpcRegisterName.Friend)
msgModel := redis.NewMsgCache(rdb, msgDocModel)
seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB())
if err != nil {
return err
Expand Down
42 changes: 4 additions & 38 deletions pkg/common/storage/cache/cachekey/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,16 @@
package cachekey

import (
"github.com/openimsdk/protocol/constant"
"strconv"
)

const (
messageCache = "MESSAGE_CACHE:"
messageDelUserList = "MESSAGE_DEL_USER_LIST:"
userDelMessagesList = "USER_DEL_MESSAGES_LIST:"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
exTypeKeyLocker = "EX_LOCK:"
reactionExSingle = "EX_SINGLE_"
reactionWriteGroup = "EX_GROUP_"
reactionReadGroup = "EX_SUPER_GROUP_"
reactionNotification = "EX_NOTIFICATION_"
sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:"
messageCache = "MSG_CACHE:"
)

func GetMessageCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + "_" + strconv.Itoa(int(seq))
}

func GetMessageDelUserListKey(conversationID string, seq int64) string {
return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq))
}

func GetUserDelListKey(conversationID, userID string) string {
return userDelMessagesList + conversationID + ":" + userID
}

func GetMessageReactionExKey(clientMsgID string, sessionType int32) string {
switch sessionType {
case constant.SingleChatType:
return reactionExSingle + clientMsgID
case constant.WriteGroupChatType:
return reactionWriteGroup + clientMsgID
case constant.ReadGroupChatType:
return reactionReadGroup + clientMsgID
case constant.NotificationChatType:
return reactionNotification + clientMsgID
}

return ""
}
func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string {
return exTypeKeyLocker + clientMsgID + "_" + TypeKey
func GetMsgCacheKey(conversationID string, seq int64) string {
return messageCache + conversationID + ":" + strconv.Itoa(int(seq))
}

func GetSendMsgKey(id string) string {
Expand Down
19 changes: 5 additions & 14 deletions pkg/common/storage/cache/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,14 @@ package cache

import (
"context"
"time"

"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
)

type MsgCache interface {
GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error)
SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error)
DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error
SetSendMsgStatus(ctx context.Context, id string, status int32) error
GetSendMsgStatus(ctx context.Context, id string) (int32, error)
JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error)
GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error)
DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error
SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error)
GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error)
SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error
LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error
UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error

GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error)
DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error
SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error
}
Loading

0 comments on commit f9e6d07

Please sign in to comment.