Skip to content

Commit

Permalink
fix: update set seq implement. (openimsdk#2911)
Browse files Browse the repository at this point in the history
* update set seq implement.

* revert seq logic.

* Update func logic.

* add log print.
  • Loading branch information
mo3et authored Dec 3, 2024
1 parent 1534575 commit 1447732
Show file tree
Hide file tree
Showing 14 changed files with 79 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.57
github.com/openimsdk/protocol v0.0.72-alpha.59
github.com/openimsdk/tools v0.0.50-alpha.38
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.57 h1:oAVg0SJkDK15L8yDrL0KPG32f3iB/vjEpfpX577p5n4=
github.com/openimsdk/protocol v0.0.72-alpha.57/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/protocol v0.0.72-alpha.59 h1:+ycb2+68mLKPIo7VrxF0id/GXP6OqZ2/nBM1YZQr7qY=
github.com/openimsdk/protocol v0.0.72-alpha.59/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU=
github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
Expand Down
1 change: 1 addition & 0 deletions internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (c *ConsumerHandler) DeleteMemberAndSetConversationSeq(ctx context.Context,
if err != nil {
return err
}

return c.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conversationID, maxSeq)
}

Expand Down
7 changes: 4 additions & 3 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ func (c *conversationServer) SetConversationMaxSeq(ctx context.Context, req *pbc
map[string]any{"max_seq": req.MaxSeq}); err != nil {
return nil, err
}

return &pbconversation.SetConversationMaxSeqResp{}, nil
}

Expand Down Expand Up @@ -670,7 +671,7 @@ func (c *conversationServer) GetOwnerConversation(ctx context.Context, req *pbco
}, nil
}

func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Context, _ *pbconversation.GetConversationsNeedDestructMsgsReq) (*pbconversation.GetConversationsNeedDestructMsgsResp, error) {
func (c *conversationServer) GetConversationsNeedClearMsg(ctx context.Context, _ *pbconversation.GetConversationsNeedClearMsgReq) (*pbconversation.GetConversationsNeedClearMsgResp, error) {
num, err := c.conversationDatabase.GetAllConversationIDsNumber(ctx)
if err != nil {
log.ZError(ctx, "GetAllConversationIDsNumber failed", err)
Expand All @@ -694,7 +695,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex

conversationIDs, err := c.conversationDatabase.PageConversationIDs(ctx, pagination)
if err != nil {
// log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
log.ZError(ctx, "PageConversationIDs failed", err, "pageNumber", pageNumber)
continue
}

Expand All @@ -717,7 +718,7 @@ func (c *conversationServer) GetConversationsNeedDestructMsgs(ctx context.Contex
}
}

return &pbconversation.GetConversationsNeedDestructMsgsResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
return &pbconversation.GetConversationsNeedClearMsgResp{Conversations: convert.ConversationsDB2Pb(temp)}, nil
}

func (c *conversationServer) GetNotNotifyConversationIDs(ctx context.Context, req *pbconversation.GetNotNotifyConversationIDsReq) (*pbconversation.GetNotNotifyConversationIDsResp, error) {
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,7 @@ func (g *groupServer) deleteMemberAndSetConversationSeq(ctx context.Context, gro
if err != nil {
return err
}

return g.conversationRpcClient.SetConversationMaxSeq(ctx, userIDs, conevrsationID, maxSeq)
}

Expand Down
35 changes: 23 additions & 12 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,34 @@ import (
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/idutil"
"github.com/openimsdk/tools/utils/stringutil"
"golang.org/x/sync/errgroup"
)

// hard delete in Database.
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
var (
docNum int
msgNum int
start = time.Now()
docNum int
msgNum int
start = time.Now()
getLimit = 5000
)

clearMsg := func(ctx context.Context) (bool, error) {
destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}

msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, 5000)
msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil {
return false, err
}
Expand All @@ -61,19 +63,19 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.
return true, nil
}

_, err = clearMsg(ctx)
_, err = destructMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
}

log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))

return &msg.ClearMsgResp{}, nil
return &msg.DestructMsgsResp{}, nil
}

// soft delete for self
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100
Expand All @@ -93,22 +95,31 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
"msgDestructTime", conversation.MsgDestructTime,
"lastMsgDestructTime", conversation.LatestMsgDestructTime)

seqs, err := m.MsgDatabase.UserMsgsDestruct(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
seqs, err := m.MsgDatabase.ClearUserMsgs(handleCtx, conversation.OwnerUserID, conversation.ConversationID, conversation.MsgDestructTime, conversation.LatestMsgDestructTime)
if err != nil {
log.ZError(handleCtx, "user msg destruct failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

if len(seqs) > 0 {
minseq := datautil.Max(seqs...)

// update
if err := m.Conversation.UpdateConversation(handleCtx,
&pbconversation.UpdateConversationReq{
UserIDs: []string{conversation.OwnerUserID},
ConversationID: conversation.ConversationID,
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli())}); err != nil {
LatestMsgDestructTime: wrapperspb.Int64(time.Now().UnixMilli()),
MinSeq: wrapperspb.Int64(minseq),
}); err != nil {
log.ZError(handleCtx, "updateUsersConversationField failed", err, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID)
continue
}

if err := m.Conversation.SetConversationMinSeq(handleCtx, []string{conversation.OwnerUserID}, conversation.ConversationID, minseq); err != nil {
return err
}

// if you need Notify SDK client userseq is update.
// m.msgNotificationSender.UserDeleteMsgsNotification(handleCtx, conversation.OwnerUserID, conversation.ConversationID, seqs)
}
Expand Down
35 changes: 18 additions & 17 deletions internal/tools/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,42 +76,43 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
crontab := cron.New()

// scheduled hard delete outdated Msgs in specific time.
clearMsgFunc := func() {
destructMsgsFunc := func() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZDebug(ctx, "clear chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())

if _, err := msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron clear chat records failed", err, "deltime", deltime, "cont", time.Since(now))
if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZDebug(ctx, "cron clear chat records success", "deltime", deltime, "cont", time.Since(now))
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil {
return errs.Wrap(err)
}

// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
msgDestructFunc := func() {
clearMsgFunc := func() {
now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "msg destruct cron start", "now", now)
log.ZDebug(ctx, "clear msg cron start", "now", now)

conversations, err := conversationClient.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return
} else {
_, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Destruct Msgs failed.", err)
return
}
}
log.ZDebug(ctx, "msg destruct cron task completed", "cont", time.Since(now))

_, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Clear Msg failed.", err)
return
}

log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, msgDestructFunc); err != nil {
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
return errs.Wrap(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (c *conversationDatabase) CreateConversation(ctx context.Context, conversat
if conversation.RecvMsgOpt == constant.ReceiveNotNotifyMessage {
notNotifyUserIDs = append(notNotifyUserIDs, conversation.OwnerUserID)
}
if conversation.IsPinned == true {
if conversation.IsPinned {
pinnedUserIDs = append(pinnedUserIDs, conversation.OwnerUserID)
}
}
Expand Down
31 changes: 20 additions & 11 deletions pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type CommonMsgDatabase interface {
// DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis
// cache).
DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error
// UserMsgsDestruct marks messages for deletion based on destruct time and returns a list of sequence numbers for marked messages.
UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error)
// ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages.
ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error)
// DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers.
DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error
// DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers.
Expand Down Expand Up @@ -92,7 +92,7 @@ type CommonMsgDatabase interface {
RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error)
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)

// clear msg
// get Msg when destruct msg before
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)

Expand Down Expand Up @@ -528,10 +528,10 @@ func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Cont
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
}

func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string, conversationID string, destructTime int64, lastMsgDestructTime time.Time) (seqs []int64, err error) {
func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) {
var index int64
for {
// from oldest 2 newest
// from oldest 2 newest, ASC
msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1)
if err != nil || msgDocModel.DocID == "" {
if err != nil {
Expand All @@ -544,15 +544,19 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
// If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion
break
}

index++
// && msgDocModel.Msg[0].Msg.SendTime > lastMsgDestructTime.UnixMilli()

// && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli()
if len(msgDocModel.Msg) > 0 {
i := 0
var over bool
for _, msg := range msgDocModel.Msg {
i++
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+destructTime*1000 <= time.Now().UnixMilli() {
if msg.Msg.SendTime+destructTime*1000 > lastMsgDestructTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) {
// over clear time, need to clear
if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() {
// if msg is not in del list, add to del list
if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) {
seqs = append(seqs, msg.Msg.Seq)
}
} else {
Expand All @@ -567,13 +571,18 @@ func (db *commonMsgDatabase) UserMsgsDestruct(ctx context.Context, userID string
}
}

log.ZDebug(ctx, "UserMsgsDestruct", "conversationID", conversationID, "userID", userID, "seqs", seqs)
log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs)

// have msg need to destruct
if len(seqs) > 0 {
userMinSeq := seqs[len(seqs)-1] + 1
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID)
// update min seq to clear after
userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after
currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before
if err != nil {
return nil, err
}

// if before < after, update min seq
if currentUserMinSeq < userMinSeq {
if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package database

import (
"context"

"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/tools/db/pagination"
)
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/storage/database/mgo/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package mgo

import (
"context"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"time"

"github.com/openimsdk/protocol/constant"
"github.com/openimsdk/tools/db/mongoutil"
Expand Down
2 changes: 1 addition & 1 deletion pkg/common/storage/database/mgo/group_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type GroupMemberMgo struct {
}

func (g *GroupMemberMgo) memberSort() any {
return bson.D{{"role_level", -1}, {"create_time", 1}}
return bson.D{{Key: "role_level", Value: -1}, {Key: "create_time", Value: 1}}
}

func (g *GroupMemberMgo) Create(ctx context.Context, groupMembers []*model.GroupMember) (err error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpcclient/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx cont
return resp.UserIDs, nil
}

func (c *ConversationRpcClient) GetConversationsNeedDestructMsgs(ctx context.Context) ([]*pbconversation.Conversation, error) {
resp, err := c.Client.GetConversationsNeedDestructMsgs(ctx, &pbconversation.GetConversationsNeedDestructMsgsReq{})
func (c *ConversationRpcClient) GetConversationsNeedClearMsg(ctx context.Context) ([]*pbconversation.Conversation, error) {
resp, err := c.Client.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/rpcclient/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,8 @@ func (m *MessageRpcClient) GetConversationMaxSeq(ctx context.Context, conversati
return resp.MaxSeq, nil
}

func (m *MessageRpcClient) ClearMsg(ctx context.Context, ts int64) error {
_, err := m.Client.ClearMsg(ctx, &msg.ClearMsgReq{Timestamp: ts})
func (m *MessageRpcClient) DestructMsgs(ctx context.Context, ts int64) error {
_, err := m.Client.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: ts})
return err
}

Expand Down

0 comments on commit 1447732

Please sign in to comment.