From 84049a1f55632903fb51afcd27967931384771e8 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Fri, 2 Aug 2024 16:58:04 +0800 Subject: [PATCH 1/3] fix: the local cache obtained can be modified (#2473) * fix: GroupApplicationAcceptedNotification * fix: GroupApplicationAcceptedNotification * fix: NotificationUserInfoUpdate * cicd: robot automated Change * fix: component * fix: getConversationInfo * feat: cron task * feat: cron task * feat: cron task * feat: cron task * feat: cron task * fix: minio config url recognition error * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * new mongo * friend incr sync * friend incr sync * friend incr sync * friend incr sync * friend incr sync * mage * optimization version log * optimization version log * sync * sync * sync * group sync * sync option * sync option * refactor: replace `friend` package with `realtion`. * refactor: update lastest commit to relation. * sync option * sync option * sync option * sync * sync * go.mod * seq * update: go mod * refactor: change incremental to full * feat: get full friend user ids * feat: api and config * seq * group version * merge * seq * seq * seq * fix: sort by id avoid unstable sort friends. * group * group * group * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * fix: sort by id avoid unstable sort friends. * user version * seq * seq * seq user * user online * implement minio expire delete. * user online * config * fix * fix * implement minio expire delete logic. * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * online cache * feat: implement scheduled delete outdated object in minio. * update gomake version * update gomake version * implement FindExpires pagination. * remove unnesseary incr. * fix uncorrect args call. * online push * online push * online push * resolving conflicts * resolving conflicts * test * api prommetrics * api prommetrics * api prommetrics * api prommetrics * api prommetrics * rpc prommetrics * rpc prommetrics * online status * online status * online status * online status * sub * conversation version incremental * merge seq * merge online * merge online * merge online * merge seq * GetOwnerConversation * fix: change incremental syncer router name. * rockscache batch get * rockscache seq batch get * fix: GetMsgDocModelByIndex bug * update go.mod * update go.mod * merge * feat: prometheus * feat: prometheus * group member sort * sub * sub * fix: seq conversion bug * fix: redis pipe exec * sort version * sort version * sort version * remove old version online subscription * remove old version online subscription * version log index * version log index * batch push * batch push * seq void filling * fix: batchGetMaxSeq * fix: batchGetMaxSeq * cache db error log * 111 * fix bug * fix: ImportFriends * add online cache * add some logs * add some logs * fix: onlineUserIDs * add logs * test * test * test * test * add log * feat: solve the problem that modifying the cached data affects other * feat: solve the problem that modifying the cached data affects other * feat: search messages to filter out notifications --------- Co-authored-by: withchao Co-authored-by: Monet Lee Co-authored-by: OpenIM-Gordon <46924906+FGadvancer@users.noreply.github.com> Co-authored-by: icey-yu <1186114839@qq.com> --- internal/msggateway/user_map.go | 5 --- pkg/common/storage/database/mgo/msg.go | 11 ++++++ pkg/rpccache/common.go | 43 ++++++++++++++++++++ pkg/rpccache/conversation.go | 55 ++++++++++++++++++-------- pkg/rpccache/friend.go | 52 ++++++++++++++++-------- pkg/rpccache/group.go | 39 ++++++++++-------- pkg/rpccache/online.go | 14 ++++++- pkg/rpccache/user.go | 47 ++++++++++------------ 8 files changed, 183 insertions(+), 83 deletions(-) diff --git a/internal/msggateway/user_map.go b/internal/msggateway/user_map.go index 36cab4ed75..5baa4f9012 100644 --- a/internal/msggateway/user_map.go +++ b/internal/msggateway/user_map.go @@ -1,10 +1,6 @@ package msggateway import ( - "context" - "fmt" - "github.com/openimsdk/protocol/constant" - "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/datautil" "sync" "time" @@ -121,7 +117,6 @@ func (u *userMap) Get(userID string, platformID int) ([]*Client, bool, bool) { } func (u *userMap) Set(userID string, client *Client) { - log.ZDebug(context.Background(), "userMap Set", "userID", userID, "platformID", client.PlatformID, "platform", constant.PlatformIDToName(client.PlatformID), "pointer", fmt.Sprintf("%p", client)) u.lock.Lock() defer u.lock.Unlock() result, ok := u.data[userID] diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 03f47c5033..7dc308a7c4 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -377,8 +377,19 @@ func (m *MsgMgo) searchMessageIndex(ctx context.Context, filter any, nextID prim if !nextID.IsZero() { pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}}) } + coarseFilter := bson.M{ + "$or": bson.A{ + bson.M{ + "doc_id": primitive.Regex{Pattern: "^sg_"}, + }, + bson.M{ + "doc_id": primitive.Regex{Pattern: "^si_"}, + }, + }, + } pipeline = append(pipeline, bson.M{"$sort": bson.M{"_id": 1}}, + bson.M{"$match": coarseFilter}, bson.M{"$match": filter}, bson.M{"$limit": limit}, bson.M{ diff --git a/pkg/rpccache/common.go b/pkg/rpccache/common.go index 5ed007edc4..15b3a8e094 100644 --- a/pkg/rpccache/common.go +++ b/pkg/rpccache/common.go @@ -14,6 +14,11 @@ package rpccache +import ( + "github.com/openimsdk/tools/errs" + "google.golang.org/protobuf/proto" +) + func newListMap[V comparable](values []V, err error) (*listMap[V], error) { if err != nil { return nil, err @@ -32,3 +37,41 @@ type listMap[V comparable] struct { List []V Map map[V]struct{} } + +func respProtoMarshal(resp proto.Message, err error) ([]byte, error) { + if err != nil { + return nil, err + } + return proto.Marshal(resp) +} + +func cacheUnmarshal[V any](resp []byte, err error) (*V, error) { + if err != nil { + return nil, err + } + var val V + if err := proto.Unmarshal(resp, any(&val).(proto.Message)); err != nil { + return nil, errs.WrapMsg(err, "local cache proto.Unmarshal error") + } + return &val, nil +} + +type cacheProto[V any] struct{} + +func (cacheProto[V]) Marshal(resp *V, err error) ([]byte, error) { + if err != nil { + return nil, err + } + return proto.Marshal(any(resp).(proto.Message)) +} + +func (cacheProto[V]) Unmarshal(resp []byte, err error) (*V, error) { + if err != nil { + return nil, err + } + var val V + if err := proto.Unmarshal(resp, any(&val).(proto.Message)); err != nil { + return nil, errs.WrapMsg(err, "local cache proto.Unmarshal error") + } + return &val, nil +} diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 0109f1b1dd..2a62c7bbd5 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -23,6 +23,7 @@ import ( pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" "golang.org/x/sync/errgroup" ) @@ -36,7 +37,7 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach log.ZDebug(context.Background(), "ConversationLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &ConversationLocalCache{ client: client, - local: localcache.New[any]( + local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), localcache.WithLinkSlotNum(lc.SlotNum), @@ -52,21 +53,30 @@ func NewConversationLocalCache(client rpcclient.ConversationRpcClient, localCach type ConversationLocalCache struct { client rpcclient.ConversationRpcClient - local localcache.Cache[any] + local localcache.Cache[[]byte] } func (c *ConversationLocalCache) GetConversationIDs(ctx context.Context, ownerUserID string) (val []string, err error) { - log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs req", "ownerUserID", ownerUserID) + resp, err := c.getConversationIDs(ctx, ownerUserID) + if err != nil { + return nil, err + } + return resp.ConversationIDs, nil +} + +func (c *ConversationLocalCache) getConversationIDs(ctx context.Context, ownerUserID string) (val *pbconversation.GetConversationIDsResp, err error) { + log.ZDebug(ctx, "ConversationLocalCache getConversationIDs req", "ownerUserID", ownerUserID) defer func() { if err == nil { - log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs return", "value", val) + log.ZDebug(ctx, "ConversationLocalCache getConversationIDs return", "ownerUserID", ownerUserID, "value", val) } else { - log.ZError(ctx, "ConversationLocalCache GetConversationIDs return", err) + log.ZError(ctx, "ConversationLocalCache getConversationIDs return", err, "ownerUserID", ownerUserID) } }() - return localcache.AnyValue[[]string](c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) (any, error) { - log.ZDebug(ctx, "ConversationLocalCache GetConversationIDs rpc", "ownerUserID", ownerUserID) - return c.client.GetConversationIDs(ctx, ownerUserID) + var cache cacheProto[pbconversation.GetConversationIDsResp] + return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationIDsKey(ownerUserID), func(ctx context.Context) ([]byte, error) { + log.ZDebug(ctx, "ConversationLocalCache getConversationIDs rpc", "ownerUserID", ownerUserID) + return cache.Marshal(c.client.Client.GetConversationIDs(ctx, &pbconversation.GetConversationIDsReq{UserID: ownerUserID})) })) } @@ -74,14 +84,15 @@ func (c *ConversationLocalCache) GetConversation(ctx context.Context, userID, co log.ZDebug(ctx, "ConversationLocalCache GetConversation req", "userID", userID, "conversationID", conversationID) defer func() { if err == nil { - log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "value", val) + log.ZDebug(ctx, "ConversationLocalCache GetConversation return", "userID", userID, "conversationID", conversationID, "value", val) } else { - log.ZError(ctx, "ConversationLocalCache GetConversation return", err) + log.ZError(ctx, "ConversationLocalCache GetConversation return", err, "userID", userID, "conversationID", conversationID) } }() - return localcache.AnyValue[*pbconversation.Conversation](c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) (any, error) { + var cache cacheProto[pbconversation.Conversation] + return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationKey(userID, conversationID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "ConversationLocalCache GetConversation rpc", "userID", userID, "conversationID", conversationID) - return c.client.GetConversation(ctx, userID, conversationID) + return cache.Marshal(c.client.GetConversation(ctx, userID, conversationID)) })) } @@ -126,9 +137,19 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser return conversations, nil } -func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (*listMap[string], error) { - return localcache.AnyValue[*listMap[string]](c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) (any, error) { - return newListMap(c.client.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)) +func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (val *pbconversation.GetConversationNotReceiveMessageUserIDsResp, err error) { + log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs req", "conversationID", conversationID) + defer func() { + if err == nil { + log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", "conversationID", conversationID, "value", val) + } else { + log.ZError(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs return", err, "conversationID", conversationID) + } + }() + var cache cacheProto[pbconversation.GetConversationNotReceiveMessageUserIDsResp] + return cache.Unmarshal(c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) ([]byte, error) { + log.ZDebug(ctx, "ConversationLocalCache getConversationNotReceiveMessageUserIDs rpc", "conversationID", conversationID) + return cache.Marshal(c.client.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID})) })) } @@ -137,7 +158,7 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx con if err != nil { return nil, err } - return res.List, nil + return res.UserIDs, nil } func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx context.Context, conversationID string) (map[string]struct{}, error) { @@ -145,5 +166,5 @@ func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx c if err != nil { return nil, err } - return res.Map, nil + return datautil.SliceSet(res.UserIDs), nil } diff --git a/pkg/rpccache/friend.go b/pkg/rpccache/friend.go index a5cee2567c..dca3b4c97c 100644 --- a/pkg/rpccache/friend.go +++ b/pkg/rpccache/friend.go @@ -16,7 +16,8 @@ package rpccache import ( "context" - cachekey2 "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/protocol/relation" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/localcache" @@ -30,7 +31,7 @@ func NewFriendLocalCache(client rpcclient.FriendRpcClient, localCache *config.Lo log.ZDebug(context.Background(), "FriendLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &FriendLocalCache{ client: client, - local: localcache.New[any]( + local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), localcache.WithLinkSlotNum(lc.SlotNum), @@ -46,36 +47,55 @@ func NewFriendLocalCache(client rpcclient.FriendRpcClient, localCache *config.Lo type FriendLocalCache struct { client rpcclient.FriendRpcClient - local localcache.Cache[any] + local localcache.Cache[[]byte] } func (f *FriendLocalCache) IsFriend(ctx context.Context, possibleFriendUserID, userID string) (val bool, err error) { - log.ZDebug(ctx, "FriendLocalCache IsFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) + res, err := f.isFriend(ctx, possibleFriendUserID, userID) + if err != nil { + return false, err + } + return res.InUser1Friends, nil +} + +func (f *FriendLocalCache) isFriend(ctx context.Context, possibleFriendUserID, userID string) (val *relation.IsFriendResp, err error) { + log.ZDebug(ctx, "FriendLocalCache isFriend req", "possibleFriendUserID", possibleFriendUserID, "userID", userID) defer func() { if err == nil { - log.ZDebug(ctx, "FriendLocalCache IsFriend return", "value", val) + log.ZDebug(ctx, "FriendLocalCache isFriend return", "possibleFriendUserID", possibleFriendUserID, "userID", userID, "value", val) } else { - log.ZError(ctx, "FriendLocalCache IsFriend return", err) + log.ZError(ctx, "FriendLocalCache isFriend return", err, "possibleFriendUserID", possibleFriendUserID, "userID", userID) } }() - return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey2.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) (any, error) { - log.ZDebug(ctx, "FriendLocalCache IsFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID) - return f.client.IsFriend(ctx, possibleFriendUserID, userID) - }, cachekey2.GetFriendIDsKey(possibleFriendUserID))) + var cache cacheProto[relation.IsFriendResp] + return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsFriendKey(possibleFriendUserID, userID), func(ctx context.Context) ([]byte, error) { + log.ZDebug(ctx, "FriendLocalCache isFriend rpc", "possibleFriendUserID", possibleFriendUserID, "userID", userID) + return cache.Marshal(f.client.Client.IsFriend(ctx, &relation.IsFriendReq{UserID1: userID, UserID2: possibleFriendUserID})) + }, cachekey.GetFriendIDsKey(possibleFriendUserID))) } // IsBlack possibleBlackUserID selfUserID. func (f *FriendLocalCache) IsBlack(ctx context.Context, possibleBlackUserID, userID string) (val bool, err error) { - log.ZDebug(ctx, "FriendLocalCache IsBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID) + res, err := f.isBlack(ctx, possibleBlackUserID, userID) + if err != nil { + return false, err + } + return res.InUser2Blacks, nil +} + +// IsBlack possibleBlackUserID selfUserID. +func (f *FriendLocalCache) isBlack(ctx context.Context, possibleBlackUserID, userID string) (val *relation.IsBlackResp, err error) { + log.ZDebug(ctx, "FriendLocalCache isBlack req", "possibleBlackUserID", possibleBlackUserID, "userID", userID) defer func() { if err == nil { - log.ZDebug(ctx, "FriendLocalCache IsBlack return", "value", val) + log.ZDebug(ctx, "FriendLocalCache isBlack return", "possibleBlackUserID", possibleBlackUserID, "userID", userID, "value", val) } else { - log.ZError(ctx, "FriendLocalCache IsBlack return", err) + log.ZError(ctx, "FriendLocalCache isBlack return", err, "possibleBlackUserID", possibleBlackUserID, "userID", userID) } }() - return localcache.AnyValue[bool](f.local.GetLink(ctx, cachekey2.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) (any, error) { + var cache cacheProto[relation.IsBlackResp] + return cache.Unmarshal(f.local.GetLink(ctx, cachekey.GetIsBlackIDsKey(possibleBlackUserID, userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "FriendLocalCache IsBlack rpc", "possibleBlackUserID", possibleBlackUserID, "userID", userID) - return f.client.IsBlack(ctx, possibleBlackUserID, userID) - }, cachekey2.GetBlackIDsKey(userID))) + return cache.Marshal(f.client.Client.IsBlack(ctx, &relation.IsBlackReq{UserID1: possibleBlackUserID, UserID2: userID})) + }, cachekey.GetBlackIDsKey(userID))) } diff --git a/pkg/rpccache/group.go b/pkg/rpccache/group.go index 55e1438be3..b2d852fc5f 100644 --- a/pkg/rpccache/group.go +++ b/pkg/rpccache/group.go @@ -17,6 +17,8 @@ package rpccache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/localcache" @@ -32,7 +34,7 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca log.ZDebug(context.Background(), "GroupLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &GroupLocalCache{ client: client, - local: localcache.New[any]( + local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), localcache.WithLinkSlotNum(lc.SlotNum), @@ -48,21 +50,22 @@ func NewGroupLocalCache(client rpcclient.GroupRpcClient, localCache *config.Loca type GroupLocalCache struct { client rpcclient.GroupRpcClient - local localcache.Cache[any] + local localcache.Cache[[]byte] } -func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *listMap[string], err error) { +func (g *GroupLocalCache) getGroupMemberIDs(ctx context.Context, groupID string) (val *group.GetGroupMemberUserIDsResp, err error) { log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs req", "groupID", groupID) defer func() { if err == nil { - log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "value", val) + log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs return", "groupID", groupID, "value", val) } else { - log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err) + log.ZError(ctx, "GroupLocalCache getGroupMemberIDs return", err, "groupID", groupID) } }() - return localcache.AnyValue[*listMap[string]](g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) (any, error) { + var cache cacheProto[group.GetGroupMemberUserIDsResp] + return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberIDsKey(groupID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache getGroupMemberIDs rpc", "groupID", groupID) - return newListMap(g.client.GetGroupMemberIDs(ctx, groupID)) + return cache.Marshal(g.client.Client.GetGroupMemberUserIDs(ctx, &group.GetGroupMemberUserIDsReq{GroupID: groupID})) })) } @@ -70,14 +73,15 @@ func (g *GroupLocalCache) GetGroupMember(ctx context.Context, groupID, userID st log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID, "userID", userID) defer func() { if err == nil { - log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "userID", userID, "value", val) } else { - log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID, "userID", userID) } }() - return localcache.AnyValue[*sdkws.GroupMemberFullInfo](g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) (any, error) { + var cache cacheProto[sdkws.GroupMemberFullInfo] + return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupMemberInfoKey(groupID, userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID, "userID", userID) - return g.client.GetGroupMemberCache(ctx, groupID, userID) + return cache.Marshal(g.client.GetGroupMemberCache(ctx, groupID, userID)) })) } @@ -85,14 +89,15 @@ func (g *GroupLocalCache) GetGroupInfo(ctx context.Context, groupID string) (val log.ZDebug(ctx, "GroupLocalCache GetGroupInfo req", "groupID", groupID) defer func() { if err == nil { - log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "value", val) + log.ZDebug(ctx, "GroupLocalCache GetGroupInfo return", "groupID", groupID, "value", val) } else { - log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err) + log.ZError(ctx, "GroupLocalCache GetGroupInfo return", err, "groupID", groupID) } }() - return localcache.AnyValue[*sdkws.GroupInfo](g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) (any, error) { + var cache cacheProto[sdkws.GroupInfo] + return cache.Unmarshal(g.local.Get(ctx, cachekey.GetGroupInfoKey(groupID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "GroupLocalCache GetGroupInfo rpc", "groupID", groupID) - return g.client.GetGroupInfoCache(ctx, groupID) + return cache.Marshal(g.client.GetGroupInfoCache(ctx, groupID)) })) } @@ -101,7 +106,7 @@ func (g *GroupLocalCache) GetGroupMemberIDs(ctx context.Context, groupID string) if err != nil { return nil, err } - return res.List, nil + return res.UserIDs, nil } func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID string) (map[string]struct{}, error) { @@ -109,7 +114,7 @@ func (g *GroupLocalCache) GetGroupMemberIDMap(ctx context.Context, groupID strin if err != nil { return nil, err } - return res.Map, nil + return datautil.SliceSet(res.UserIDs), nil } func (g *GroupLocalCache) GetGroupInfos(ctx context.Context, groupIDs []string) ([]*sdkws.GroupInfo, error) { diff --git a/pkg/rpccache/online.go b/pkg/rpccache/online.go index 13578a7df6..2ffa1f1577 100644 --- a/pkg/rpccache/online.go +++ b/pkg/rpccache/online.go @@ -47,7 +47,7 @@ type OnlineCache struct { local lru.LRU[string, []int32] } -func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { +func (o *OnlineCache) getUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { platformIDs, err := o.local.Get(userID, func() ([]int32, error) { return o.user.GetUserOnlinePlatform(ctx, userID) }) @@ -59,8 +59,18 @@ func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) return platformIDs, nil } +func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) { + platformIDs, err := o.getUserOnlinePlatform(ctx, userID) + if err != nil { + return nil, err + } + tmp := make([]int32, len(platformIDs)) + copy(tmp, platformIDs) + return platformIDs, nil +} + func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) { - platformIDs, err := o.GetUserOnlinePlatform(ctx, userID) + platformIDs, err := o.getUserOnlinePlatform(ctx, userID) if err != nil { return false, err } diff --git a/pkg/rpccache/user.go b/pkg/rpccache/user.go index 6126f5891c..7c676f30a1 100644 --- a/pkg/rpccache/user.go +++ b/pkg/rpccache/user.go @@ -16,12 +16,12 @@ package rpccache import ( "context" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/localcache" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/redis/go-redis/v9" @@ -32,7 +32,7 @@ func NewUserLocalCache(client rpcclient.UserRpcClient, localCache *config.LocalC log.ZDebug(context.Background(), "UserLocalCache", "topic", lc.Topic, "slotNum", lc.SlotNum, "slotSize", lc.SlotSize, "enable", lc.Enable()) x := &UserLocalCache{ client: client, - local: localcache.New[any]( + local: localcache.New[[]byte]( localcache.WithLocalSlotNum(lc.SlotNum), localcache.WithLocalSlotSize(lc.SlotSize), localcache.WithLinkSlotNum(lc.SlotNum), @@ -48,7 +48,7 @@ func NewUserLocalCache(client rpcclient.UserRpcClient, localCache *config.LocalC type UserLocalCache struct { client rpcclient.UserRpcClient - local localcache.Cache[any] + local localcache.Cache[[]byte] } func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *sdkws.UserInfo, err error) { @@ -60,24 +60,34 @@ func (u *UserLocalCache) GetUserInfo(ctx context.Context, userID string) (val *s log.ZError(ctx, "UserLocalCache GetUserInfo return", err) } }() - return localcache.AnyValue[*sdkws.UserInfo](u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) (any, error) { + var cache cacheProto[sdkws.UserInfo] + return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserInfoKey(userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "UserLocalCache GetUserInfo rpc", "userID", userID) - return u.client.GetUserInfo(ctx, userID) + return cache.Marshal(u.client.GetUserInfo(ctx, userID)) })) } func (u *UserLocalCache) GetUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val int32, err error) { - log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt req", "userID", userID) + resp, err := u.getUserGlobalMsgRecvOpt(ctx, userID) + if err != nil { + return 0, err + } + return resp.GlobalRecvMsgOpt, nil +} + +func (u *UserLocalCache) getUserGlobalMsgRecvOpt(ctx context.Context, userID string) (val *user.GetGlobalRecvMessageOptResp, err error) { + log.ZDebug(ctx, "UserLocalCache getUserGlobalMsgRecvOpt req", "userID", userID) defer func() { if err == nil { - log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", "value", val) + log.ZDebug(ctx, "UserLocalCache getUserGlobalMsgRecvOpt return", "value", val) } else { - log.ZError(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt return", err) + log.ZError(ctx, "UserLocalCache getUserGlobalMsgRecvOpt return", err) } }() - return localcache.AnyValue[int32](u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) (any, error) { + var cache cacheProto[user.GetGlobalRecvMessageOptResp] + return cache.Unmarshal(u.local.Get(ctx, cachekey.GetUserGlobalRecvMsgOptKey(userID), func(ctx context.Context) ([]byte, error) { log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) - return u.client.GetUserGlobalMsgRecvOpt(ctx, userID) + return cache.Marshal(u.client.Client.GetGlobalRecvMessageOpt(ctx, &user.GetGlobalRecvMessageOptReq{UserID: userID})) })) } @@ -110,18 +120,3 @@ func (u *UserLocalCache) GetUsersInfoMap(ctx context.Context, userIDs []string) } return users, nil } - -//func (u *UserLocalCache) GetUserOnlinePlatform(ctx context.Context, userID string) (val []int32, err error) { -// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform req", "userID", userID) -// defer func() { -// if err == nil { -// log.ZDebug(ctx, "UserLocalCache GetUserOnlinePlatform return", "value", val) -// } else { -// log.ZError(ctx, "UserLocalCache GetUserOnlinePlatform return", err) -// } -// }() -// return localcache.AnyValue[[]int32](u.local.Get(ctx, cachekey.GetOnlineKey(userID), func(ctx context.Context) (any, error) { -// log.ZDebug(ctx, "UserLocalCache GetUserGlobalMsgRecvOpt rpc", "userID", userID) -// return u.client.GetUserGlobalMsgRecvOpt(ctx, userID) -// })) -//} From c45967079eb20d4c756b7476e4d5a28c51b0e30f Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 2 Aug 2024 18:30:17 +0800 Subject: [PATCH 2/3] feat: update go mod pkg to latest. (#2475) --- go.mod | 5 +++-- go.sum | 10 ++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 49254097a4..d1873d3528 100644 --- a/go.mod +++ b/go.mod @@ -12,8 +12,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.69-alpha.50 - github.com/openimsdk/tools v0.0.49-alpha.55 + github.com/openimsdk/protocol v0.0.69 + github.com/openimsdk/tools v0.0.49 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -74,6 +74,7 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/chai2010/webp v1.1.1 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/clbanning/mxj v1.8.4 // indirect github.com/coreos/go-semver v0.3.0 // indirect diff --git a/go.sum b/go.sum index ac525b2db6..95e16f81af 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/bytedance/sonic v1.9.1/go.mod h1:i736AoUSYt75HyZLoJW9ERYxcy6eaN6h4BZX github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/chai2010/webp v1.1.1 h1:jTRmEccAJ4MGrhFOrPMpNGIJ/eybIgwKpcACsrTEapk= +github.com/chai2010/webp v1.1.1/go.mod h1:0XVwvZWdjjdxpUEIf7b9g9VkHFnInUSYujwqTLEuldU= github.com/chenzhuoyu/base64x v0.0.0-20211019084208-fb5309c8db06/go.mod h1:DH46F32mSOjUmXrMHnKwZdA8wcEefY7UVqBKYGjpdQY= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 h1:qSGYFH7+jGhDF8vLC+iwCD4WpbV1EBDSzWkJODFLams= github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311/go.mod h1:b583jCggY9gE99b6G5LEC39OIiVsWj+R97kbl5odCEk= @@ -319,10 +321,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.69-alpha.50 h1:4r6vY9LsjFrR8AAwORFhijOGmq2vzDH3XTX4wBiw+2M= -github.com/openimsdk/protocol v0.0.69-alpha.50/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= -github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= +github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo= +github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= +github.com/openimsdk/tools v0.0.49 h1:yILTgOCqxlqJMc889fE99E5ZGa70v/E3hkCSeTnWl3s= +github.com/openimsdk/tools v0.0.49/go.mod h1:oiSQU5Z6fzjxKFjbqDHImD8EmCIwClU1Rkur1sK12Po= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= From 840ddc15fe12742656fdaba80e7e19b63f27b845 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Fri, 2 Aug 2024 20:13:13 +0800 Subject: [PATCH 3/3] chore: revert tool pkg version. (#2476) * feat: update go mod pkg to latest. * revert tool pkg version. --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index d1873d3528..fba1499fe1 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.69 - github.com/openimsdk/tools v0.0.49 + github.com/openimsdk/tools v0.0.49-alpha.55 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 95e16f81af..1a8e1d76d8 100644 --- a/go.sum +++ b/go.sum @@ -323,8 +323,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.69 h1:dVi8meSg8kmUzSH1XQab4MjihqKkkcCAmt1BYXPJuXo= github.com/openimsdk/protocol v0.0.69/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= -github.com/openimsdk/tools v0.0.49 h1:yILTgOCqxlqJMc889fE99E5ZGa70v/E3hkCSeTnWl3s= -github.com/openimsdk/tools v0.0.49/go.mod h1:oiSQU5Z6fzjxKFjbqDHImD8EmCIwClU1Rkur1sK12Po= +github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k= +github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4= github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=