From 58c4c13cf1179437f42a9a68c1ec84deaa5b604c Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Fri, 7 Jun 2024 17:45:41 +0800 Subject: [PATCH] sync --- internal/api/group.go | 12 ++- internal/api/router.go | 2 + internal/rpc/friend/friend.go | 3 - internal/rpc/friend/sync.go | 14 ++-- internal/rpc/group/group.go | 5 +- internal/rpc/group/sync.go | 82 +++++++++++++++---- internal/rpc/incrversion/option.go | 40 ++++----- pkg/common/config/config.go | 6 +- pkg/common/storage/cache/redis/group.go | 3 - pkg/common/storage/controller/group.go | 13 ++- pkg/common/storage/database/group_member.go | 2 +- pkg/common/storage/database/mgo/friend.go | 8 +- .../storage/database/mgo/group_member.go | 18 ++-- .../storage/database/mgo/version_log.go | 18 ++-- pkg/common/storage/database/version_log.go | 2 +- pkg/common/storage/model/version_log.go | 26 ++++-- 16 files changed, 161 insertions(+), 93 deletions(-) diff --git a/internal/api/group.go b/internal/api/group.go index 5b8b9d0442..0bf61c7878 100644 --- a/internal/api/group.go +++ b/internal/api/group.go @@ -179,10 +179,18 @@ func (o *GroupApi) GetIncrementalGroupMemberBatch(c *gin.Context) { return } resp.List[req.GroupID] = res - changeCount += len(res.Changes) + len(res.DeleteUserIds) - if changeCount >= int(res.SyncCount) { + changeCount += len(res.Insert) + len(res.Delete) + len(res.Update) + if changeCount >= 200 { break } } apiresp.GinSuccess(c, resp) } + +func (o *GroupApi) GetIncrementalGroupMemberUserIDs(c *gin.Context) { + a2r.Call(group.GroupClient.GetIncrementalGroupMemberUserIDs, o.Client, c) +} + +func (o *GroupApi) GetIncrementalJoinGroupIDs(c *gin.Context) { + a2r.Call(group.GroupClient.GetIncrementalJoinGroupIDs, o.Client, c) +} diff --git a/internal/api/router.go b/internal/api/router.go index 445a24694c..69c026ffd6 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -120,6 +120,8 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En groupRouterGroup.POST("/get_incremental_join_group", g.GetIncrementalJoinGroup) groupRouterGroup.POST("/get_incremental_group_member", g.GetIncrementalGroupMember) groupRouterGroup.POST("/get_incremental_group_member_batch", g.GetIncrementalGroupMemberBatch) + groupRouterGroup.POST("/get_incremental_group_member_user_ids", g.GetIncrementalGroupMemberUserIDs) + groupRouterGroup.POST("/get_incremental_join_group_ids", g.GetIncrementalJoinGroupIDs) } // certificate authRouterGroup := r.Group("/auth") diff --git a/internal/rpc/friend/friend.go b/internal/rpc/friend/friend.go index cbc16cca96..12107125c1 100644 --- a/internal/rpc/friend/friend.go +++ b/internal/rpc/friend/friend.go @@ -64,9 +64,6 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - if config.RpcConfig.FriendSyncCount < 1 { - config.RpcConfig.FriendSyncCount = constant.MaxSyncPullNumber - } mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err diff --git a/internal/rpc/friend/sync.go b/internal/rpc/friend/sync.go index a73cf6a170..cddd23f898 100644 --- a/internal/rpc/friend/sync.go +++ b/internal/rpc/friend/sync.go @@ -25,14 +25,14 @@ func (s *friendServer) GetIncrementalFriends(ctx context.Context, req *relation. return s.getFriend(ctx, req.UserID, ids) }, ID: func(elem *sdkws.FriendInfo) string { return elem.FriendUser.UserID }, - Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { + Resp: func(version *model.VersionLog, deleteIds []string, insertList, updateList []*sdkws.FriendInfo, full bool) *relation.GetIncrementalFriendsResp { return &relation.GetIncrementalFriendsResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - SyncCount: uint32(s.config.RpcConfig.FriendSyncCount), - DeleteUserIds: delIDs, - Changes: list, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: deleteIds, + Insert: insertList, + Update: updateList, } }, } diff --git a/internal/rpc/group/group.go b/internal/rpc/group/group.go index 1cdcdb54b5..4d429b3d3b 100644 --- a/internal/rpc/group/group.go +++ b/internal/rpc/group/group.go @@ -77,9 +77,6 @@ type Config struct { } func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error { - if config.RpcConfig.GroupSyncCount <= 0 { - config.RpcConfig.GroupSyncCount = constant.MaxSyncPullNumber - } mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build()) if err != nil { return err @@ -104,7 +101,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg msgRpcClient := rpcclient.NewMessageRpcClient(client, config.Share.RpcRegisterName.Msg) conversationRpcClient := rpcclient.NewConversationRpcClient(client, config.Share.RpcRegisterName.Conversation) var gs groupServer - database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs), config.RpcConfig.GroupSyncCount) + database := controller.NewGroupDatabase(rdb, &config.LocalCacheConfig, groupDB, groupMemberDB, groupRequestDB, mgocli.GetTx(), grouphash.NewGroupHashFromGroupServer(&gs)) gs.db = database gs.user = userRpcClient gs.notification = NewGroupNotificationSender(database, &msgRpcClient, &userRpcClient, config, func(ctx context.Context, userIDs []string) ([]notification.CommonUser, error) { diff --git a/internal/rpc/group/sync.go b/internal/rpc/group/sync.go index b35816a7eb..4bd34eb4d4 100644 --- a/internal/rpc/group/sync.go +++ b/internal/rpc/group/sync.go @@ -2,6 +2,9 @@ package group import ( "context" + "crypto/md5" + "encoding/binary" + "encoding/json" "github.com/openimsdk/open-im-server/v3/internal/rpc/incrversion" "github.com/openimsdk/open-im-server/v3/pkg/authverify" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -9,6 +12,57 @@ import ( "github.com/openimsdk/protocol/sdkws" ) +func (s *groupServer) idHash(ids []string) uint64 { + if len(ids) == 0 { + return 0 + } + data, _ := json.Marshal(ids) + sum := md5.Sum(data) + return binary.BigEndian.Uint64(sum[:]) +} + +func (s *groupServer) GetIncrementalGroupMemberUserIDs(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberUserIDsReq) (*pbgroup.GetIncrementalGroupMemberUserIDsResp, error) { + vl, err := s.db.FindMaxGroupMemberVersionCache(ctx, req.GroupID) + if err != nil { + return nil, err + } + userIDs, err := s.db.FindGroupMemberUserID(ctx, req.GroupID) + if err != nil { + return nil, err + } + idHash := s.idHash(userIDs) + if req.IdHash == idHash { + userIDs = nil + } + return &pbgroup.GetIncrementalGroupMemberUserIDsResp{ + Version: idHash, + VersionID: vl.ID.Hex(), + Equal: req.IdHash == idHash, + UserIDs: userIDs, + }, nil +} + +func (s *groupServer) GetIncrementalJoinGroupIDs(ctx context.Context, req *pbgroup.GetIncrementalJoinGroupIDsReq) (*pbgroup.GetIncrementalJoinGroupIDsResp, error) { + vl, err := s.db.FindMaxJoinGroupVersionCache(ctx, req.UserID) + if err != nil { + return nil, err + } + groupIDs, err := s.db.FindJoinGroupID(ctx, req.UserID) + if err != nil { + return nil, err + } + idHash := s.idHash(groupIDs) + if req.IdHash == idHash { + groupIDs = nil + } + return &pbgroup.GetIncrementalJoinGroupIDsResp{ + Version: idHash, + VersionID: vl.ID.Hex(), + Equal: req.IdHash == idHash, + GroupIDs: groupIDs, + }, nil +} + func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgroup.GetIncrementalGroupMemberReq) (*pbgroup.GetIncrementalGroupMemberResp, error) { opt := incrversion.Option[*sdkws.GroupMemberFullInfo, pbgroup.GetIncrementalGroupMemberResp]{ Ctx: ctx, @@ -21,14 +75,14 @@ func (s *groupServer) GetIncrementalGroupMember(ctx context.Context, req *pbgrou return s.getGroupMembersInfo(ctx, req.GroupID, ids) }, ID: func(elem *sdkws.GroupMemberFullInfo) string { return elem.UserID }, - Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { + Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupMemberFullInfo, full bool) *pbgroup.GetIncrementalGroupMemberResp { return &pbgroup.GetIncrementalGroupMemberResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - SyncCount: uint32(s.config.RpcConfig.GroupSyncCount), - DeleteUserIds: delIDs, - Changes: list, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: delIDs, + Insert: insertList, + Update: updateList, } }, } @@ -48,14 +102,14 @@ func (s *groupServer) GetIncrementalJoinGroup(ctx context.Context, req *pbgroup. CacheMaxVersion: s.db.FindMaxJoinGroupVersionCache, Find: s.getGroupsInfo, ID: func(elem *sdkws.GroupInfo) string { return elem.GroupID }, - Resp: func(version *model.VersionLog, delIDs []string, list []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { + Resp: func(version *model.VersionLog, delIDs []string, insertList, updateList []*sdkws.GroupInfo, full bool) *pbgroup.GetIncrementalJoinGroupResp { return &pbgroup.GetIncrementalJoinGroupResp{ - VersionID: version.ID.Hex(), - Version: uint64(version.Version), - Full: full, - SyncCount: uint32(s.config.RpcConfig.GroupSyncCount), - DeleteGroupIds: delIDs, - Changes: list, + VersionID: version.ID.Hex(), + Version: uint64(version.Version), + Full: full, + Delete: delIDs, + Insert: insertList, + Update: updateList, } }, } diff --git a/internal/rpc/incrversion/option.go b/internal/rpc/incrversion/option.go index ceb024a3e1..f7a71244a0 100644 --- a/internal/rpc/incrversion/option.go +++ b/internal/rpc/incrversion/option.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/utils/datautil" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -35,7 +34,7 @@ type Option[A, B any] struct { //SortID func(ctx context.Context, dId string) ([]string, error) Find func(ctx context.Context, ids []string) ([]A, error) ID func(elem A) string - Resp func(version *model.VersionLog, delIDs []string, list []A, full bool) *B + Resp func(version *model.VersionLog, deleteIds []string, insertList, updateList []A, full bool) *B } func (o *Option[A, B]) newError(msg string) error { @@ -130,31 +129,28 @@ func (o *Option[A, B]) Build() (*B, error) { panic(fmt.Errorf("undefined tag %d", tag)) } var ( - deleteIDs []string - changeIDs []string + insertIds []string + deleteIds []string + updateIds []string ) - if full { - //changeIDs, err = o.SortID(o.Ctx, o.VersionKey) - //if err != nil { - // return nil, err - //} - } else { - deleteIDs, changeIDs = version.DeleteAndChangeIDs() + if !full { + insertIds, deleteIds, updateIds = version.DeleteAndChangeIDs() } - var list []A - if len(changeIDs) > 0 { - list, err = o.Find(o.Ctx, changeIDs) + var ( + insertList []A + updateList []A + ) + if len(insertIds) > 0 { + insertList, err = o.Find(o.Ctx, insertIds) if err != nil { return nil, err } - if (!full) && o.ID != nil && len(changeIDs) != len(list) { - foundIDs := datautil.SliceSetAny(list, o.ID) - for _, id := range changeIDs { - if _, ok := foundIDs[id]; !ok { - deleteIDs = append(deleteIDs, id) - } - } + } + if len(updateIds) > 0 { + updateList, err = o.Find(o.Ctx, updateIds) + if err != nil { + return nil, err } } - return o.Resp(version, deleteIDs, list, full), nil + return o.Resp(version, deleteIds, insertList, updateList, full), nil } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index a4177dc067..5313c196ac 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -244,8 +244,7 @@ type Friend struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - FriendSyncCount int `mapstructure:"friendSyncCount"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Group struct { @@ -254,8 +253,7 @@ type Group struct { ListenIP string `mapstructure:"listenIP"` Ports []int `mapstructure:"ports"` } `mapstructure:"rpc"` - Prometheus Prometheus `mapstructure:"prometheus"` - GroupSyncCount int `mapstructure:"groupSyncCount"` + Prometheus Prometheus `mapstructure:"prometheus"` } type Msg struct { diff --git a/pkg/common/storage/cache/redis/group.go b/pkg/common/storage/cache/redis/group.go index 4015d5cd98..c4598567d3 100644 --- a/pkg/common/storage/cache/redis/group.go +++ b/pkg/common/storage/cache/redis/group.go @@ -45,7 +45,6 @@ type GroupCacheRedis struct { expireTime time.Duration rcClient *rockscache.Client groupHash cache.GroupHash - syncCount int } func NewGroupCacheRedis( @@ -56,7 +55,6 @@ func NewGroupCacheRedis( groupRequestDB database.GroupRequest, hashCode cache.GroupHash, opts *rockscache.Options, - syncCount int, ) cache.GroupCache { batchHandler := NewBatchDeleterRedis(rdb, opts, []string{localCache.Group.Topic}) g := localCache.Group @@ -70,7 +68,6 @@ func NewGroupCacheRedis( groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, groupHash: hashCode, - syncCount: syncCount, } } diff --git a/pkg/common/storage/controller/group.go b/pkg/common/storage/controller/group.go index 14406420b0..cb64bc73a3 100644 --- a/pkg/common/storage/controller/group.go +++ b/pkg/common/storage/controller/group.go @@ -117,6 +117,8 @@ type GroupDatabase interface { FindMaxJoinGroupVersionCache(ctx context.Context, userID string) (*model.VersionLog, error) SearchJoinGroup(ctx context.Context, userID string, keyword string, pagination pagination.Pagination) (int64, []*model.Group, error) + + FindJoinGroupID(ctx context.Context, userID string) ([]string, error) } func NewGroupDatabase( @@ -127,14 +129,13 @@ func NewGroupDatabase( groupRequestDB database.GroupRequest, ctxTx tx.Tx, groupHash cache.GroupHash, - syncCount int, ) GroupDatabase { return &groupDatabase{ groupDB: groupDB, groupMemberDB: groupMemberDB, groupRequestDB: groupRequestDB, ctxTx: ctxTx, - cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions(), syncCount), + cache: redis2.NewGroupCacheRedis(rdb, localCache, groupDB, groupMemberDB, groupRequestDB, groupHash, redis2.GetRocksCacheOptions()), } } @@ -146,6 +147,10 @@ type groupDatabase struct { cache cache.GroupCache } +func (g *groupDatabase) FindJoinGroupID(ctx context.Context, userID string) ([]string, error) { + return g.cache.GetJoinedGroupIDs(ctx, userID) +} + func (g *groupDatabase) FindGroupMembers(ctx context.Context, groupID string, userIDs []string) ([]*model.GroupMember, error) { return g.cache.GetGroupMembersInfo(ctx, groupID, userIDs) } @@ -243,7 +248,7 @@ func (g *groupDatabase) UpdateGroup(ctx context.Context, groupID string, data ma return err } for _, userID := range userIDs { - if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, false); err != nil { + if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, userID, []string{groupID}, model.VersionStateUpdate); err != nil { return err } @@ -276,7 +281,7 @@ func (g *groupDatabase) DismissGroup(ctx context.Context, groupID string, delete } c = c.DelMaxJoinGroupVersion(userIDs...) if len(userIDs) > 0 { - if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, true); err != nil { + if err := g.groupMemberDB.JoinGroupIncrVersion(ctx, groupID, userIDs, model.VersionStateDelete); err != nil { return err } } diff --git a/pkg/common/storage/database/group_member.go b/pkg/common/storage/database/group_member.go index c397eda58a..0051d694f3 100644 --- a/pkg/common/storage/database/group_member.go +++ b/pkg/common/storage/database/group_member.go @@ -34,7 +34,7 @@ type GroupMember interface { TakeGroupMemberNum(ctx context.Context, groupID string) (count int64, err error) FindUserManagedGroupID(ctx context.Context, userID string) (groupIDs []string, err error) IsUpdateRoleLevel(data map[string]any) bool - JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, deleted bool) error + JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) FindJoinIncrVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) } diff --git a/pkg/common/storage/database/mgo/friend.go b/pkg/common/storage/database/mgo/friend.go index 646951d7d0..699d9cff6a 100644 --- a/pkg/common/storage/database/mgo/friend.go +++ b/pkg/common/storage/database/mgo/friend.go @@ -66,7 +66,7 @@ func (f *FriendMgo) Create(ctx context.Context, friends []*model.Friend) error { mp[friend.OwnerUserID] = append(mp[friend.OwnerUserID], friend.FriendUserID) } for ownerUserID, friendUserIDs := range mp { - if err := f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, false); err != nil { + if err := f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateInsert); err != nil { return err } } @@ -83,7 +83,7 @@ func (f *FriendMgo) Delete(ctx context.Context, ownerUserID string, friendUserID return mongoutil.IncrVersion(func() error { return mongoutil.DeleteOne(ctx, f.coll, filter) }, func() error { - return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, true) + return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateDelete) }) } @@ -99,7 +99,7 @@ func (f *FriendMgo) UpdateByMap(ctx context.Context, ownerUserID string, friendU return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, f.coll, filter, bson.M{"$set": args}, true) }, func() error { - return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, false) + return f.owner.IncrVersion(ctx, ownerUserID, []string{friendUserID}, model.VersionStateUpdate) }) } @@ -189,7 +189,7 @@ func (f *FriendMgo) UpdateFriends(ctx context.Context, ownerUserID string, frien return mongoutil.IncrVersion(func() error { return mongoutil.Ignore(mongoutil.UpdateMany(ctx, f.coll, filter, update)) }, func() error { - return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, false) + return f.owner.IncrVersion(ctx, ownerUserID, friendUserIDs, model.VersionStateUpdate) }) } diff --git a/pkg/common/storage/database/mgo/group_member.go b/pkg/common/storage/database/mgo/group_member.go index cb64c87a41..ece1d7941e 100644 --- a/pkg/common/storage/database/mgo/group_member.go +++ b/pkg/common/storage/database/mgo/group_member.go @@ -70,7 +70,7 @@ func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.Group gms[member.GroupID] = append(gms[member.GroupID], member.UserID) } for groupID, userIDs := range gms { - if err := g.member.IncrVersion(ctx, groupID, userIDs, false); err != nil { + if err := g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateInsert); err != nil { return err } } @@ -81,7 +81,7 @@ func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.Group gms[member.UserID] = append(gms[member.UserID], member.GroupID) } for userID, groupIDs := range gms { - if err := g.join.IncrVersion(ctx, userID, groupIDs, false); err != nil { + if err := g.join.IncrVersion(ctx, userID, groupIDs, model.VersionStateInsert); err != nil { return err } } @@ -97,12 +97,12 @@ func (g *GroupMemberMgo) Delete(ctx context.Context, groupID string, userIDs []s return mongoutil.IncrVersion(func() error { return mongoutil.DeleteMany(ctx, g.coll, filter) }, func() error { - return g.member.IncrVersion(ctx, groupID, userIDs, true) + return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateDelete) }, func() error { if len(userIDs) == 0 { return nil } - return g.member.IncrVersion(ctx, groupID, userIDs, true) + return g.member.IncrVersion(ctx, groupID, userIDs, model.VersionStateDelete) }) } @@ -110,9 +110,9 @@ func (g *GroupMemberMgo) UpdateRoleLevel(ctx context.Context, groupID string, us return mongoutil.IncrVersion(func() error { return g.Update(ctx, groupID, userID, bson.M{"role_level": roleLevel}) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, true) + return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) }, func() error { - return g.join.IncrVersion(ctx, groupID, []string{userID}, true) + return g.join.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) }) } @@ -123,7 +123,7 @@ func (g *GroupMemberMgo) Update(ctx context.Context, groupID string, userID stri return mongoutil.IncrVersion(func() error { return mongoutil.UpdateOne(ctx, g.coll, bson.M{"group_id": groupID, "user_id": userID}, bson.M{"$set": data}, true) }, func() error { - return g.member.IncrVersion(ctx, groupID, []string{userID}, false) + return g.member.IncrVersion(ctx, groupID, []string{userID}, model.VersionStateUpdate) }) } @@ -174,8 +174,8 @@ func (g *GroupMemberMgo) IsUpdateRoleLevel(data map[string]any) bool { return ok } -func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, deleted bool) error { - return g.join.IncrVersion(ctx, userID, groupIDs, deleted) +func (g *GroupMemberMgo) JoinGroupIncrVersion(ctx context.Context, userID string, groupIDs []string, state int32) error { + return g.join.IncrVersion(ctx, userID, groupIDs, state) } func (g *GroupMemberMgo) FindMemberIncrVersion(ctx context.Context, groupID string, version uint, limit int) (*model.VersionLog, error) { diff --git a/pkg/common/storage/database/mgo/version_log.go b/pkg/common/storage/database/mgo/version_log.go index 5629c5c00f..8ab11007d1 100644 --- a/pkg/common/storage/database/mgo/version_log.go +++ b/pkg/common/storage/database/mgo/version_log.go @@ -36,7 +36,7 @@ func (l *VersionLogMgo) initIndex(ctx context.Context) error { return err } -func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, deleted bool) error { +func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error { if len(eIds) == 0 { return errs.ErrArgs.WrapMsg("elem id is empty", "dId", dId) } @@ -44,19 +44,19 @@ func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []stri return errs.ErrArgs.WrapMsg("elem id is duplicate", "dId", dId, "eIds", eIds) } now := time.Now() - res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now) + res, err := l.writeLogBatch(ctx, dId, eIds, state, now) if err != nil { return err } if res.MatchedCount > 0 { return nil } - if _, err := l.initDoc(ctx, dId, eIds, deleted, now); err == nil { + if _, err := l.initDoc(ctx, dId, eIds, state, now); err == nil { return nil } else if !mongo.IsDuplicateKeyError(err) { return err } - if res, err := l.writeLogBatch(ctx, dId, eIds, deleted, now); err != nil { + if res, err := l.writeLogBatch(ctx, dId, eIds, state, now); err != nil { return err } else if res.MatchedCount == 0 { return errs.ErrInternalServer.WrapMsg("mongodb return value that should not occur", "coll", l.coll.Name(), "dId", dId, "eIds", eIds) @@ -64,7 +64,7 @@ func (l *VersionLogMgo) IncrVersion(ctx context.Context, dId string, eIds []stri return nil } -func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*model.VersionLogTable, error) { +func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*model.VersionLogTable, error) { wl := model.VersionLogTable{ ID: primitive.NewObjectID(), DID: dId, @@ -76,7 +76,7 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, for _, eId := range eIds { wl.Logs = append(wl.Logs, model.VersionLogElem{ EID: eId, - Deleted: deleted, + State: state, Version: database.FirstVersion, LastUpdate: now, }) @@ -85,7 +85,7 @@ func (l *VersionLogMgo) initDoc(ctx context.Context, dId string, eIds []string, return &wl, err } -func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, deleted bool, now time.Time) (*mongo.UpdateResult, error) { +func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []string, state int32, now time.Time) (*mongo.UpdateResult, error) { if eIds == nil { eIds = []string{} } @@ -97,7 +97,7 @@ func (l *VersionLogMgo) writeLogBatch(ctx context.Context, dId string, eIds []st elems = append(elems, bson.M{ "e_id": eId, "version": "$version", - "deleted": deleted, + "state": state, "last_update": now, }) } @@ -159,7 +159,7 @@ func (l *VersionLogMgo) FindChangeLog(ctx context.Context, dId string, version u } else if !errors.Is(err, mongo.ErrNoDocuments) { return nil, err } - if res, err := l.initDoc(ctx, dId, nil, false, time.Now()); err == nil { + if res, err := l.initDoc(ctx, dId, nil, 0, time.Now()); err == nil { return res.VersionLog(), nil } else if mongo.IsDuplicateKeyError(err) { return l.findChangeLog(ctx, dId, version, limit) diff --git a/pkg/common/storage/database/version_log.go b/pkg/common/storage/database/version_log.go index 3450d2776e..c9dc09540b 100644 --- a/pkg/common/storage/database/version_log.go +++ b/pkg/common/storage/database/version_log.go @@ -12,7 +12,7 @@ const ( ) type VersionLog interface { - IncrVersion(ctx context.Context, dId string, eIds []string, deleted bool) error + IncrVersion(ctx context.Context, dId string, eIds []string, state int32) error FindChangeLog(ctx context.Context, dId string, version uint, limit int) (*model.VersionLog, error) DeleteAfterUnchangedLog(ctx context.Context, deadline time.Time) error } diff --git a/pkg/common/storage/model/version_log.go b/pkg/common/storage/model/version_log.go index a09f493a86..e1b5fe7c52 100644 --- a/pkg/common/storage/model/version_log.go +++ b/pkg/common/storage/model/version_log.go @@ -1,13 +1,22 @@ package model import ( + "context" + "errors" + "github.com/openimsdk/tools/log" "go.mongodb.org/mongo-driver/bson/primitive" "time" ) +const ( + VersionStateInsert = iota + 1 + VersionStateDelete + VersionStateUpdate +) + type VersionLogElem struct { EID string `bson:"e_id"` - Deleted bool `bson:"deleted"` + State int32 `bson:"state"` Version uint `bson:"version"` LastUpdate time.Time `bson:"last_update"` } @@ -43,12 +52,17 @@ type VersionLog struct { LogLen int `bson:"log_len"` } -func (v *VersionLog) DeleteAndChangeIDs() (delIds []string, changeIds []string) { +func (v *VersionLog) DeleteAndChangeIDs() (insertIds, deleteIds, updateIds []string) { for _, l := range v.Logs { - if l.Deleted { - delIds = append(delIds, l.EID) - } else { - changeIds = append(changeIds, l.EID) + switch l.State { + case VersionStateInsert: + insertIds = append(insertIds, l.EID) + case VersionStateDelete: + deleteIds = append(deleteIds, l.EID) + case VersionStateUpdate: + updateIds = append(updateIds, l.EID) + default: + log.ZError(context.Background(), "invalid version status found", errors.New("dirty database data"), "objID", v.ID.Hex(), "did", v.DID, "elem", l) } } return