Skip to content

Commit

Permalink
sync
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Jun 7, 2024
1 parent a1523f4 commit 58c4c13
Show file tree
Hide file tree
Showing 16 changed files with 161 additions and 93 deletions.
12 changes: 10 additions & 2 deletions internal/api/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,18 @@ func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) {
return
}
resp.List[req.GroupID] = res
changeCount += len(res.Changes) + len(res.DeleteUserIds)
if changeCount >= int(res.SyncCount) {
changeCount += len(res.Insert) + len(res.Delete) + len(res.Update)
if changeCount >= 200 {
break
}
}
apiresp.GinSuccess(c, resp)
}

func (o *GroupApi) GetIncrementalGroupMemberUserIDs(c *gin.Context) {
a2r.Call(group.GroupClient.GetIncrementalGroupMemberUserIDs, o.Client, c)
}

func (o *GroupApi) GetIncrementalJoinGroupIDs(c *gin.Context) {
a2r.Call(group.GroupClient.GetIncrementalJoinGroupIDs, o.Client, c)
}
2 changes: 2 additions & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
groupRouterGroup.POST("/get_incremental_join_group", g.GetIncrementalJoinGroup)
groupRouterGroup.POST("/get_incremental_group_member", g.GetIncrementalGroupMember)
groupRouterGroup.POST("/get_incremental_group_member_batch", g.GetIncrementalGroupMemberBatch)
groupRouterGroup.POST("/get_incremental_group_member_user_ids", g.GetIncrementalGroupMemberUserIDs)
groupRouterGroup.POST("/get_incremental_join_group_ids", g.GetIncrementalJoinGroupIDs)
}
// certificate
authRouterGroup := r.Group("/auth")
Expand Down
3 changes: 0 additions & 3 deletions internal/rpc/friend/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ type Config struct {
}

func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
if config.RpcConfig.FriendSyncCount < 1 {
config.RpcConfig.FriendSyncCount = constant.MaxSyncPullNumber
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
Expand Down
14 changes: 7 additions & 7 deletions internal/rpc/friend/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation.
return s.getFriend(ctx, req.UserID, ids)
},
ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp {
return &relation.GetIncrementalFriendsResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.FriendSyncCount),
DeleteUserIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: deleteIds,
Insert: insertList,
Update: updateList,
}
},
}
Expand Down
5 changes: 1 addition & 4 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,6 @@ type Config struct {
}

func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error {
if config.RpcConfig.GroupSyncCount <= 0 {
config.RpcConfig.GroupSyncCount = constant.MaxSyncPullNumber
}
mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
if err != nil {
return err
Expand All @@ -104,7 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg)
conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation)
var gs groupServer
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs), config.RpcConfig.GroupSyncCount)
database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs))
gs.db = database
gs.user = userRpcClient
gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) {
Expand Down
82 changes: 68 additions & 14 deletions internal/rpc/group/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,67 @@ package group

import (
"context"
"crypto/md5"
"encoding/binary"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
pbgroup "github.com/openimsdk/protocol/group"
"github.com/openimsdk/protocol/sdkws"
)

func (s *groupServer) idHash(ids []string) uint64 {
if len(ids) == 0 {
return 0
}
data, _ := json.Marshal(ids)
sum := md5.Sum(data)
return binary.BigEndian.Uint64(sum[:])
}

func (s *groupServer) GetIncrementalGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberUserIDsReq) (*pbgroup.GetIncrementalGroupMemberUserIDsResp, error) {
vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID)
if err != nil {
return nil, err
}
userIDs, err := s.db.FindGroupMemberUserID(ctx, req.GroupID)
if err != nil {
return nil, err
}
idHash := s.idHash(userIDs)
if req.IdHash == idHash {
userIDs = nil
}
return &pbgroup.GetIncrementalGroupMemberUserIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
UserIDs: userIDs,
}, nil
}

func (s *groupServer) GetIncrementalJoinGroupIDs(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupIDsReq) (*pbgroup.GetIncrementalJoinGroupIDsResp, error) {
vl, err := s.db.FindMaxJoinGroupVersionCache(ctx, req.UserID)
if err != nil {
return nil, err
}
groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID)
if err != nil {
return nil, err
}
idHash := s.idHash(groupIDs)
if req.IdHash == idHash {
groupIDs = nil
}
return &pbgroup.GetIncrementalJoinGroupIDsResp{
Version: idHash,
VersionID: vl.ID.Hex(),
Equal: req.IdHash == idHash,
GroupIDs: groupIDs,
}, nil
}

func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) {
opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{
Ctx: ctx,
Expand All @@ -21,14 +75,14 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou
return s.getGroupMembersInfo(ctx, req.GroupID, ids)
},
ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp {
return &pbgroup.GetIncrementalGroupMemberResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.GroupSyncCount),
DeleteUserIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}
Expand All @@ -48,14 +102,14 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup.
CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache,
Find: s.getGroupsInfo,
ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID },
Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp {
Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp {
return &pbgroup.GetIncrementalJoinGroupResp{
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
SyncCount: uint32(s.config.RpcConfig.GroupSyncCount),
DeleteGroupIds: delIDs,
Changes: list,
VersionID: version.ID.Hex(),
Version: uint64(version.Version),
Full: full,
Delete: delIDs,
Insert: insertList,
Update: updateList,
}
},
}
Expand Down
40 changes: 18 additions & 22 deletions internal/rpc/incrversion/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/utils/datautil"
"go.mongodb.org/mongo-driver/bson/primitive"
)

Expand Down Expand Up @@ -35,7 +34,7 @@ type Option[A, B any] struct {
//SortID func(ctx context.Context, dId string) ([]string, error)
Find func(ctx context.Context, ids []string) ([]A, error)
ID func(elem A) string
Resp func(version *model.VersionLog, delIDs []string, list []A, full bool) *B
Resp func(version *model.VersionLog, deleteIds []string, insertList, updateList []A, full bool) *B
}

func (o *Option[A, B]) newError(msg string) error {
Expand Down Expand Up @@ -130,31 +129,28 @@ func (o *Option[A, B]) Build() (*B, error) {
panic(fmt.Errorf("undefined tag %d", tag))
}
var (
deleteIDs []string
changeIDs []string
insertIds []string
deleteIds []string
updateIds []string
)
if full {
//changeIDs, err = o.SortID(o.Ctx, o.VersionKey)
//if err != nil {
// return nil, err
//}
} else {
deleteIDs, changeIDs = version.DeleteAndChangeIDs()
if !full {
insertIds, deleteIds, updateIds = version.DeleteAndChangeIDs()
}
var list []A
if len(changeIDs) > 0 {
list, err = o.Find(o.Ctx, changeIDs)
var (
insertList []A
updateList []A
)
if len(insertIds) > 0 {
insertList, err = o.Find(o.Ctx, insertIds)
if err != nil {
return nil, err
}
if (!full) && o.ID != nil && len(changeIDs) != len(list) {
foundIDs := datautil.SliceSetAny(list, o.ID)
for _, id := range changeIDs {
if _, ok := foundIDs[id]; !ok {
deleteIDs = append(deleteIDs, id)
}
}
}
if len(updateIds) > 0 {
updateList, err = o.Find(o.Ctx, updateIds)
if err != nil {
return nil, err
}
}
return o.Resp(version, deleteIDs, list, full), nil
return o.Resp(version, deleteIds, insertList, updateList, full), nil
}
6 changes: 2 additions & 4 deletions pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,7 @@ type Friend struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
FriendSyncCount int `mapstructure:"friendSyncCount"`
Prometheus Prometheus `mapstructure:"prometheus"`
}

type Group struct {
Expand All @@ -254,8 +253,7 @@ type Group struct {
ListenIP string `mapstructure:"listenIP"`
Ports []int `mapstructure:"ports"`
} `mapstructure:"rpc"`
Prometheus Prometheus `mapstructure:"prometheus"`
GroupSyncCount int `mapstructure:"groupSyncCount"`
Prometheus Prometheus `mapstructure:"prometheus"`
}

type Msg struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/common/storage/cache/redis/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ type GroupCacheRedis struct {
expireTime time.Duration
rcClient *rockscache.Client
groupHash cache.GroupHash
syncCount int
}

func NewGroupCacheRedis(
Expand All @@ -56,7 +55,6 @@ func NewGroupCacheRedis(
groupRequestDB database.GroupRequest,
hashCode cache.GroupHash,
opts *rockscache.Options,
syncCount int,
) cache.GroupCache {
batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic})
g := localCache.Group
Expand All @@ -70,7 +68,6 @@ func NewGroupCacheRedis(
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
groupHash: hashCode,
syncCount: syncCount,
}
}

Expand Down
13 changes: 9 additions & 4 deletions pkg/common/storage/controller/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ type GroupDatabase interface {
FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error)

SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error)

FindJoinGroupID(ctx context.Context, userID string) ([]string, error)
}

func NewGroupDatabase(
Expand All @@ -127,14 +129,13 @@ func NewGroupDatabase(
groupRequestDB database.GroupRequest,
ctxTx tx.Tx,
groupHash cache.GroupHash,
syncCount int,
) GroupDatabase {
return &groupDatabase{
groupDB: groupDB,
groupMemberDB: groupMemberDB,
groupRequestDB: groupRequestDB,
ctxTx: ctxTx,
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions(), syncCount),
cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()),
}
}

Expand All @@ -146,6 +147,10 @@ type groupDatabase struct {
cache cache.GroupCache
}

func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) {
return g.cache.GetJoinedGroupIDs(ctx, userID)
}

func (g *groupDatabase) FindGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupMember, error) {
return g.cache.GetGroupMembersInfo(ctx, groupID, userIDs)
}
Expand Down Expand Up @@ -243,7 +248,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma
return err
}
for _, userID := range userIDs {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, false); err != nil {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate); err != nil {
return err
}

Expand Down Expand Up @@ -276,7 +281,7 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete
}
c = c.DelMaxJoinGroupVersion(userIDs...)
if len(userIDs) > 0 {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, true); err != nil {
if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, model.VersionStateDelete); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/storage/database/group_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type GroupMember interface {
TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error)
FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error)
IsUpdateRoleLevel(data map[string]any) bool
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, deleted bool) error
JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error
FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error)
FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
}
Loading

0 comments on commit 58c4c13

Please sign in to comment.