From e9895062cb1fd7c3fcf788621591fbb87e991143 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:45:37 +0800 Subject: [PATCH] feat: Optimize Scheduled Task (#2985) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks --- go.mod | 4 +- go.sum | 8 +- internal/api/router.go | 1 + internal/rpc/conversation/conversation.go | 50 +++++++ internal/rpc/msg/clear.go | 85 ++++++------ internal/rpc/third/s3.go | 99 +++----------- internal/rpc/third/third.go | 14 +- internal/tools/cron_task.go | 125 +++++++----------- internal/tools/cron_test.go | 63 +++++++++ internal/tools/msg.go | 36 +++++ internal/tools/s3.go | 79 +++++++++++ internal/tools/user_msg.go | 34 +++++ pkg/common/storage/controller/conversation.go | 6 + pkg/common/storage/controller/msg.go | 110 ++++++++------- pkg/common/storage/controller/s3.go | 24 ++-- pkg/common/storage/database/conversation.go | 1 + .../storage/database/mgo/conversation.go | 32 +++++ pkg/common/storage/database/mgo/msg.go | 119 +++++++++-------- pkg/common/storage/database/mgo/msg_test.go | 101 +++++++++----- pkg/common/storage/database/mgo/object.go | 29 ++-- pkg/common/storage/database/msg.go | 6 +- pkg/common/storage/database/object.go | 7 +- 22 files changed, 640 insertions(+), 393 deletions(-) create mode 100644 internal/tools/cron_test.go create mode 100644 internal/tools/msg.go create mode 100644 internal/tools/s3.go create mode 100644 internal/tools/user_msg.go diff --git a/go.mod b/go.mod index 4eaa18ccc8..6c1d421c84 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.66 - github.com/openimsdk/tools v0.0.50-alpha.57 + github.com/openimsdk/protocol v0.0.72-alpha.67 + github.com/openimsdk/tools v0.0.50-alpha.58 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 6e283eb668..86ed9ed6e3 100644 --- a/go.sum +++ b/go.sum @@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.66 h1:5KoDY6M4T+pXg449ScF6hqeQ+WenBwNyUJn/t8W0oBQ= -github.com/openimsdk/protocol v0.0.72-alpha.66/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.57 h1:oIKV6vYhqp7TRmZ6Pe+r9RNl1D5s7aB/kE9yQVEWcSY= -github.com/openimsdk/tools v0.0.50-alpha.57/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= +github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0= +github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4= +github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/router.go b/internal/api/router.go index d6bf4e130b..5e8038068c 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -164,6 +164,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En authRouterGroup.POST("/get_user_token", a.GetUserToken) authRouterGroup.POST("/parse_token", a.ParseToken) authRouterGroup.POST("/force_logout", a.ForceLogout) + } // Third service thirdGroup := r.Group("/third") diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index f534ca6dea..696ada1521 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -763,3 +763,53 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req * } return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil } + +func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) { + conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit)) + if err != nil { + return nil, err + } + latestMsgDestructTime := time.UnixMilli(req.Timestamp) + for i, conversation := range conversations { + if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + continue + } + rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime} + resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq) + if err != nil { + return nil, err + } + if resp.Seq <= 0 { + log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq) + if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil { + return nil, err + } + continue + } + resp.Seq++ + if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil { + return nil, err + } + log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime) + } + return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil +} + +func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error { + update := map[string]any{ + "latest_msg_destruct_time": latestMsgDestructTime, + } + if minSeq >= 0 { + req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq} + if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil { + return err + } + update["min_seq"] = minSeq + } + + if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil { + return err + } + c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID}) + return nil +} diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 7d62e7c8fa..7a2d363009 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -2,6 +2,7 @@ package msg import ( "context" + "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/authverify" @@ -9,7 +10,6 @@ import ( pbconv "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/wrapperspb" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" @@ -19,63 +19,50 @@ import ( ) // hard delete in Database. -func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err } - if req.Timestamp > time.Now().UnixMilli() { - return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error") + docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit)) + if err != nil { + return nil, err } - var ( - docNum int - msgNum int - start = time.Now() - getLimit = 5000 - ) - - destructMsg := func(ctx context.Context) (bool, error) { - docIDs, err := m.MsgDatabase.GetDocIDs(ctx) - if err != nil { - return false, err - } - - msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit) - if err != nil { - return false, err + for i, doc := range docs { + if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil { + return nil, err } - if len(msgs) == 0 { - return false, nil + log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID) + index := strings.LastIndex(doc.DocID, ":") + if index < 0 { + continue } - - for _, msg := range msgs { - index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg) - if err != nil { - return false, err + var minSeq int64 + for _, model := range doc.Msg { + if model.Msg == nil { + continue } - if len(index) == 0 { - return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed") + if model.Msg.Seq > minSeq { + minSeq = model.Msg.Seq } - - docNum++ - msgNum += len(index) } - - return true, nil - } - - _, err = destructMsg(ctx) - if err != nil { - log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - return nil, err + if minSeq <= 0 { + continue + } + conversationID := doc.DocID[:index] + if conversationID == "" { + continue + } + minSeq++ + if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil { + return nil, err + } + log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq) } - - log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start)) - - return &msg.DestructMsgsResp{}, nil + return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil } // soft delete for user self -func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { +func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) { temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 @@ -137,3 +124,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return nil, nil } + +func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) { + seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time) + if err != nil { + return nil, err + } + return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil +} diff --git a/internal/rpc/third/s3.go b/internal/rpc/third/s3.go index b4b064d7f4..8796fe824e 100644 --- a/internal/rpc/third/s3.go +++ b/internal/rpc/third/s3.go @@ -19,17 +19,14 @@ import ( "encoding/base64" "encoding/hex" "encoding/json" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" "path" "strconv" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "go.mongodb.org/mongo-driver/mongo" - "github.com/google/uuid" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" - "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/protocol/third" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -288,87 +285,35 @@ func (t *thirdServer) apiAddress(prefix, name string) string { } func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) { - var conf config.Third - expireTime := time.UnixMilli(req.ExpireTime) - - findPagination := &sdkws.RequestPagination{ - PageNumber: 1, - ShowNumber: 500, + if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil { + return nil, err } - + engine := t.config.RpcConfig.Object.Enable + expireTime := time.UnixMilli(req.ExpireTime) // Find all expired data in S3 database - total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination) - if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { - return nil, errs.Wrap(err) - } - - if total == 0 { - log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total) - return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil - } - - needDelObjectKeys := make([]string, len(models)) - for _, model := range models { - needDelObjectKeys = append(needDelObjectKeys, model.Key) + models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit)) + if err != nil { + return nil, err } - - // Remove duplicate keys, have the same key use in different models - needDelObjectKeys = datautil.Distinct(needDelObjectKeys) - - for _, key := range needDelObjectKeys { - // Find all models by key - keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key) - if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments { + for i, obj := range models { + if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil { return nil, errs.Wrap(err) } - - // check keyModels, if all keyModels. - needDelKey := true // Default can delete - for _, keymodel := range keyModels { - // If group is empty or CreateTime is after expireTime, can't delete this key - if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) { - needDelKey = false - break - } - } - - // If this object is not referenced by not expire data, delete it - if needDelKey && t.minio != nil { - // If have a thumbnail, delete it - thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key) - if thumbnailKey != "" { - err := t.s3dataBase.DeleteObject(ctx, thumbnailKey) - if err != nil { - log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey) - } - } - - // Delete object - err = t.s3dataBase.DeleteObject(ctx, key) - if err != nil { - log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key) - } - - // Delete cache key - err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key) - if err != nil { - log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key) - } + if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil { + return nil, err } - } - - // handle delete data in S3 database - for _, model := range models { - // Delete all expired data row in S3 database - err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name) + count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key) if err != nil { - return nil, errs.Wrap(err) + return nil, err + } + log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count) + if count == 0 { + if err := t.s3.DeleteObject(ctx, obj.Key); err != nil { + return nil, err + } } } - - log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total) - - return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil + return &third.DeleteOutdatedDataResp{Count: int32(len(models))}, nil } type FormDataMate struct { diff --git a/internal/rpc/third/third.go b/internal/rpc/third/third.go index 77e6d459fa..dc964fdb15 100644 --- a/internal/rpc/third/third.go +++ b/internal/rpc/third/third.go @@ -44,7 +44,7 @@ type thirdServer struct { s3dataBase controller.S3Database defaultExpire time.Duration config *Config - minio *minio.Minio + s3 s3.Interface } type Config struct { @@ -79,13 +79,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg // Select the oss method according to the profile policy enable := config.RpcConfig.Object.Enable var ( - o s3.Interface - minioCli *minio.Minio + o s3.Interface ) switch enable { case "minio": - minioCli, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build()) - o = minioCli + o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build()) case "cos": o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build()) case "oss": @@ -106,15 +104,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg s3dataBase: controller.NewS3Database(rdb, o, s3db), defaultExpire: time.Hour * 24 * 7, config: config, - minio: minioCli, + s3: o, }) return nil } -func (t *thirdServer) getMinioImageThumbnailKey(ctx context.Context, name string) (string, error) { - return t.minio.GetImageThumbnailKey(ctx, name) -} - func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) { err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime) if err != nil { diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 2fe7d0e395..049a6199cf 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -2,10 +2,6 @@ package tools import ( "context" - "fmt" - "os" - "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/config" kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" pbconversation "github.com/openimsdk/protocol/conversation" @@ -60,87 +56,58 @@ func Start(ctx context.Context, config *CronTaskConfig) error { return err } - msgClient := msg.NewMsgClient(msgConn) - conversationClient := pbconversation.NewConversationClient(conversationConn) - thirdClient := third.NewThirdClient(thirdConn) - - crontab := cron.New() - - // scheduled hard delete outdated Msgs in specific time. - destructMsgsFunc := func() { - now := time.Now() - deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords)) - ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli())) - log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) - - if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil { - log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now)) - return - } - log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now)) - } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil { - return errs.Wrap(err) + srv := &cronServer{ + ctx: ctx, + config: config, + cron: cron.New(), + msgClient: msg.NewMsgClient(msgConn), + conversationClient: pbconversation.NewConversationClient(conversationConn), + thirdClient: third.NewThirdClient(thirdConn), } - // scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature. - clearMsgFunc := func() { - now := time.Now() - ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli())) - log.ZDebug(ctx, "clear msg cron start", "now", now) - - conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) - if err != nil { - log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) - return - } - - _, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations}) - if err != nil { - log.ZError(ctx, "Clear Msg failed.", err) - return - } - - log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now)) - } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil { - return errs.Wrap(err) + if err := srv.registerClearS3(); err != nil { + return err } - - // scheduled delete outdated file Objects and their datas in specific time. - deleteObjectFunc := func() { - now := time.Now() - executeNum := 5 - // number of pagination. if need modify, need update value in third.DeleteOutdatedData - pageShowNumber := 500 - deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime)) - ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())) - log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) - - if len(config.CronTask.DeleteObjectType) == 0 { - log.ZDebug(ctx, "cron deleteoutDatedData not type need delete", "deletetime", deleteTime, "DeleteObjectType", config.CronTask.DeleteObjectType, "cont", time.Since(now)) - return - } - - for i := 0; i < executeNum; i++ { - resp, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: config.CronTask.DeleteObjectType}) - if err != nil { - log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now)) - return - } - if resp.Count == 0 || resp.Count < int32(pageShowNumber) { - break - } - } - - log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now)) + if err := srv.registerDeleteMsg(); err != nil { + return err } - if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil { - return errs.Wrap(err) + if err := srv.registerClearUserMsg(); err != nil { + return err } - log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime) - crontab.Start() + srv.cron.Start() <-ctx.Done() return nil } + +type cronServer struct { + ctx context.Context + config *CronTaskConfig + cron *cron.Cron + msgClient msg.MsgClient + conversationClient pbconversation.ConversationClient + thirdClient third.ThirdClient +} + +func (c *cronServer) registerClearS3() error { + if c.config.CronTask.FileExpireTime <= 0 || len(c.config.CronTask.DeleteObjectType) == 0 { + log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType) + return nil + } + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3) + return errs.WrapMsg(err, "failed to register clear s3 cron task") +} + +func (c *cronServer) registerDeleteMsg() error { + if c.config.CronTask.RetainChatRecords <= 0 { + log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords) + return nil + } + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg) + return errs.WrapMsg(err, "failed to register delete msg cron task") +} + +func (c *cronServer) registerClearUserMsg() error { + _, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg) + return errs.WrapMsg(err, "failed to register clear user msg cron task") +} diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go new file mode 100644 index 0000000000..8903490698 --- /dev/null +++ b/internal/tools/cron_test.go @@ -0,0 +1,63 @@ +package tools + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/mcontext" + "github.com/openimsdk/tools/mw" + "github.com/robfig/cron/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "testing" +) + +func TestName(t *testing.T) { + conf := &config.Discovery{ + Enable: config.ETCD, + Etcd: config.Etcd{ + RootDirectory: "openim", + Address: []string{"localhost:12379"}, + }, + } + client, err := kdisc.NewDiscoveryRegister(conf, "source") + if err != nil { + panic(err) + } + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials())) + ctx := mcontext.SetOpUserID(context.Background(), "imAdmin") + msgConn, err := client.GetConn(ctx, "msg-rpc-service") + if err != nil { + panic(err) + } + thirdConn, err := client.GetConn(ctx, "third-rpc-service") + if err != nil { + panic(err) + } + + conversationConn, err := client.GetConn(ctx, "conversation-rpc-service") + if err != nil { + panic(err) + } + + srv := &cronServer{ + ctx: ctx, + config: &CronTaskConfig{ + CronTask: config.CronTask{ + RetainChatRecords: 1, + FileExpireTime: 1, + DeleteObjectType: []string{"msg-picture", "msg-file", "msg-voice", "msg-video", "msg-video-snapshot", "sdklog", ""}, + }, + }, + cron: cron.New(), + msgClient: msg.NewMsgClient(msgConn), + conversationClient: pbconversation.NewConversationClient(conversationConn), + thirdClient: third.NewThirdClient(thirdConn), + } + srv.deleteMsg() + //srv.clearS3() + //srv.clearUserMsg() +} diff --git a/internal/tools/msg.go b/internal/tools/msg.go new file mode 100644 index 0000000000..cc00cc5b83 --- /dev/null +++ b/internal/tools/msg.go @@ -0,0 +1,36 @@ +package tools + +import ( + "fmt" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "os" + "time" +) + +func (c *cronServer) deleteMsg() { + now := time.Now() + deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords)) + operationID := fmt.Sprintf("cron_msg_%d_%d", os.Getpid(), deltime.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) + const ( + deleteCount = 10000 + deleteLimit = 50 + ) + var count int + for i := 1; i <= deleteCount; i++ { + ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i)) + resp, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli(), Limit: deleteLimit}) + if err != nil { + log.ZError(ctx, "cron destruct chat records failed", err) + break + } + count += int(resp.Count) + if resp.Count < deleteLimit { + break + } + } + log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "count", count) +} diff --git a/internal/tools/s3.go b/internal/tools/s3.go new file mode 100644 index 0000000000..9b6b9c4089 --- /dev/null +++ b/internal/tools/s3.go @@ -0,0 +1,79 @@ +package tools + +import ( + "fmt" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "os" + "time" +) + +func (c *cronServer) clearS3() { + start := time.Now() + deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime)) + operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + const ( + deleteCount = 10000 + deleteLimit = 100 + ) + + var count int + for i := 1; i <= deleteCount; i++ { + resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: deleteLimit}) + if err != nil { + log.ZError(ctx, "cron deleteoutDatedData failed", err) + return + } + count += int(resp.Count) + if resp.Count < deleteLimit { + break + } + } + log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count) +} + +// var req *third.DeleteOutdatedDataReq +// count1, err := ExtractField(ctx, c.thirdClient.DeleteOutdatedData, req, (*third.DeleteOutdatedDataResp).GetCount) +// +// c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{}) +// msggateway.GetUsersOnlineStatusCaller.Invoke(ctx, &msggateway.GetUsersOnlineStatusReq{}) +// +// var cli ThirdClient +// +// c111, err := cli.DeleteOutdatedData(ctx, 100) +// +// cli.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{}) +// +// cli.AuthSign(ctx, &third.AuthSignReq{}) +// +// cli.SetAppBadge() +// +//} +// +//func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { +// resp, err := fn(ctx, req) +// if err != nil { +// var c C +// return c, err +// } +// return get(resp), nil +//} +// +//func ignore(_ any, err error) error { +// return err +//} +// +//type ThirdClient struct { +// third.ThirdClient +//} +// +//func (c *ThirdClient) DeleteOutdatedData(ctx context.Context, expireTime int64) (int32, error) { +// return extractField(ctx, c.ThirdClient.DeleteOutdatedData, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}, (*third.DeleteOutdatedDataResp).GetCount) +//} +// +//func (c *ThirdClient) DeleteOutdatedData1(ctx context.Context, expireTime int64) error { +// return ignore(c.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expireTime})) +//} diff --git a/internal/tools/user_msg.go b/internal/tools/user_msg.go new file mode 100644 index 0000000000..a4afa769ed --- /dev/null +++ b/internal/tools/user_msg.go @@ -0,0 +1,34 @@ +package tools + +import ( + "fmt" + pbconversation "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/mcontext" + "os" + "time" +) + +func (c *cronServer) clearUserMsg() { + now := time.Now() + operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli()) + ctx := mcontext.SetOperationID(c.ctx, operationID) + log.ZDebug(ctx, "clear user msg cron start") + const ( + deleteCount = 10000 + deleteLimit = 100 + ) + var count int + for i := 1; i <= deleteCount; i++ { + resp, err := c.conversationClient.ClearUserConversationMsg(ctx, &pbconversation.ClearUserConversationMsgReq{Timestamp: now.UnixMilli(), Limit: deleteLimit}) + if err != nil { + log.ZError(ctx, "ClearUserConversationMsg failed.", err) + return + } + count += int(resp.Count) + if resp.Count < deleteLimit { + break + } + } + log.ZDebug(ctx, "clear user msg cron task completed", "cont", time.Since(now), "count", count) +} diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index bf41cce957..d4088e0c0e 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -74,6 +74,8 @@ type ConversationDatabase interface { GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) // GetPinnedConversationIDs gets pinned conversationIDs by userID GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) + // FindRandConversation finds random conversations based on the specified timestamp and limit. + FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use } return conversationIDs, nil } + +func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { + return c.conversationDB.FindRandConversation(ctx, ts, limit) +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 3c3cd9671b..c29544c333 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "errors" - "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" @@ -69,6 +68,7 @@ type CommonMsgDatabase interface { GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) SetMinSeqs(ctx context.Context, seqs map[string]int64) error + SetMinSeq(ctx context.Context, conversationID string, seq int64) error SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error @@ -95,13 +95,16 @@ type CommonMsgDatabase interface { ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) // get Msg when destruct msg before - GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error) - DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) + //DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) - GetDocIDs(ctx context.Context) ([]string, error) + GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error + + DeleteDoc(ctx context.Context, docID string) error + + GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -806,9 +809,10 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin return db.seqConversation.GetMaxSeq(ctx, conversationID) } -func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { - return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) -} +// +//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { +// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) +//} func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.seqConversation.SetMinSeqs(ctx, seqs) @@ -947,56 +951,40 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) } -func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { - var msgs []*model.MsgDocModel - for i := 0; i < len(docIDs); i += 1000 { - end := i + 1000 - if end > len(docIDs) { - end = len(docIDs) - } - - res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit) - if err != nil { - return nil, err - } - msgs = append(msgs, res...) - - if len(msgs) >= limit { - return msgs[:limit], nil - } - } - return msgs, nil +func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { + return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) } -func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { - var notNull int - index := make([]int, 0, len(doc.Msg)) - for i, message := range doc.Msg { - if message.Msg != nil { - notNull++ - if message.Msg.SendTime < ts { - index = append(index, i) - } - } - } - if len(index) == 0 { - return index, nil - } - maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq - conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] - if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil { - return index, err - } - if len(index) == notNull { - log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) - return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) - } else { - log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) - return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) - } -} - -func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error { +// +//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { +// var notNull int +// index := make([]int, 0, len(doc.Msg)) +// for i, message := range doc.Msg { +// if message.Msg != nil { +// notNull++ +// if message.Msg.SendTime < ts { +// index = append(index, i) +// } +// } +// } +// if len(index) == 0 { +// return index, nil +// } +// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq +// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] +// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { +// return index, err +// } +// if len(index) == notNull { +// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) +// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) +// } else { +// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) +// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) +// } +//} + +func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { if errors.Is(errs.Unwrap(err), redis.Nil) { @@ -1010,8 +998,8 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } -func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) { - return db.msgDocDatabase.GetDocIDs(ctx) +func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { + return db.msgDocDatabase.GetRandDocIDs(ctx, limit) } func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { @@ -1026,3 +1014,11 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio // todo: only the time in the redis cache will be taken, not the message time return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs) } + +func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { + return db.msgDocDatabase.DeleteDoc(ctx, docID) +} + +func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { + return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) +} diff --git a/pkg/common/storage/controller/s3.go b/pkg/common/storage/controller/s3.go index 4e5ad18b6e..6693d2ddea 100644 --- a/pkg/common/storage/controller/s3.go +++ b/pkg/common/storage/controller/s3.go @@ -24,7 +24,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" - "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/s3" "github.com/openimsdk/tools/s3/cont" "github.com/redis/go-redis/v9" @@ -40,11 +39,10 @@ type S3Database interface { SetObject(ctx context.Context, info *model.Object) error StatObject(ctx context.Context, name string) (*s3.ObjectInfo, error) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) - FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) - DeleteObject(ctx context.Context, name string) error - DeleteSpecifiedData(ctx context.Context, engine string, name string) error - FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) + FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) + DeleteSpecifiedData(ctx context.Context, engine string, name []string) error DelS3Key(ctx context.Context, engine string, keys ...string) error + GetKeyCount(ctx context.Context, engine string, key string) (int64, error) } func NewS3Database(rdb redis.UniversalClient, s3 s3.Interface, obj database.ObjectInfo) S3Database { @@ -120,19 +118,17 @@ func (s *s3Database) StatObject(ctx context.Context, name string) (*s3.ObjectInf func (s *s3Database) FormData(ctx context.Context, name string, size int64, contentType string, duration time.Duration) (*s3.FormData, error) { return s.s3.FormData(ctx, name, size, contentType, duration) } -func (s *s3Database) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { - return s.db.FindNeedDeleteObjectByDB(ctx, duration, needDelType, pagination) -} -func (s *s3Database) DeleteObject(ctx context.Context, name string) error { - return s.s3.DeleteObject(ctx, name) +func (s *s3Database) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) { + return s.db.FindExpirationObject(ctx, engine, expiration, needDelType, count) } -func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name string) error { - return s.db.Delete(ctx, engine, name) + +func (s *s3Database) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { + return s.db.GetKeyCount(ctx, engine, key) } -func (s *s3Database) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { - return s.db.FindModelsByKey(ctx, key) +func (s *s3Database) DeleteSpecifiedData(ctx context.Context, engine string, name []string) error { + return s.db.Delete(ctx, engine, name) } func (s *s3Database) DelS3Key(ctx context.Context, engine string, keys ...string) error { diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 30ca01ee71..1fb53cfed2 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -42,4 +42,5 @@ type Conversation interface { GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) + FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 10e223c893..851ec99c40 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -228,3 +228,35 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { return c.version.FindChangeLog(ctx, userID, version, limit) } + +func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "is_msg_destruct": true, + "msg_destruct_time": bson.M{"$ne": 0}, + }, + }, + { + "$addFields": bson.M{ + "next_msg_destruct_timestamp": bson.M{ + "$add": []any{ + bson.M{ + "$toLong": "$latest_msg_destruct_time", + }, "$msg_destruct_time"}, + }, + }, + }, + { + "$match": bson.M{ + "next_msg_destruct_timestamp": bson.M{"$lt": ts}, + }, + }, + { + "$sample": bson.M{ + "size": limit, + }, + }, + } + return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) +} diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index fc1fe47eab..f371766958 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -1227,8 +1227,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string } } -func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) { - limit := 5000 +func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { var skip int var docIDs []string var offset int @@ -1267,15 +1266,18 @@ func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) { return docIDs, errs.Wrap(err) } -func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) { +func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ { "$match": bson.M{ - "doc_id": bson.M{ - "$in": docIDs, - }, - "msgs.msg.send_time": bson.M{ - "$lt": ts, + "msgs": bson.M{ + "$not": bson.M{ + "$elemMatch": bson.M{ + "msg.send_time": bson.M{ + "$gt": ts, + }, + }, + }, }, }, }, @@ -1288,7 +1290,9 @@ func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, li }, }, { - "$limit": limit, + "$sample": bson.M{ + "size": limit, + }, }, }) } @@ -1305,53 +1309,58 @@ func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true) } -//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) { -// ts := t.UnixMilli() -// var count int64 -// for { -// msgs, err := m.GetBeforeMsg(ctx, ts, 100) -// if err != nil { -// return count, err -// } -// if len(msgs) == 0 { -// return count, nil -// } -// for _, msg := range msgs { -// num, err := m.deleteOneMsg(ctx, ts, msg) -// count += num -// if err != nil { -// return count, err -// } -// } -// } -//} - func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error { return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID}) } -//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) { -// var notNull int -// index := make([]int, 0, len(doc.Msg)) -// for i, message := range doc.Msg { -// if message.Msg != nil { -// notNull++ -// if message.Msg.SendTime < ts { -// index = append(index, i) -// } -// } -// } -// if len(index) == 0 { -// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID) -// } -// if len(index) == notNull { -// if err := m.DeleteDoc(ctx, doc.DocID); err != nil { -// return 0, err -// } -// } else { -// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil { -// return 0, err -// } -// } -// return int64(len(index)), nil -//} +func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "doc_id": bson.M{ + "$regex": fmt.Sprintf("^%s", conversationID), + }, + }, + }, + { + "$match": bson.M{ + "msgs.msg.send_time": bson.M{ + "$lte": time, + }, + }, + }, + { + "$sort": bson.M{ + "_id": -1, + }, + }, + { + "$limit": 1, + }, + { + "$project": bson.M{ + "_id": 0, + "doc_id": 1, + "msgs.msg.send_time": 1, + "msgs.msg.seq": 1, + }, + }, + } + res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline) + if err != nil { + return 0, err + } + if len(res) == 0 { + return 0, nil + } + var seq int64 + for _, v := range res[0].Msg { + if v.Msg == nil { + continue + } + if v.Msg.SendTime <= time { + seq = v.Msg.Seq + } + } + return seq, nil +} diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index 5aed4dc511..992090552e 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -3,12 +3,11 @@ package mgo import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "math" "math/rand" "strconv" "testing" @@ -16,35 +15,45 @@ import ( ) func TestName1(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) - defer cancel() - cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.48:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - - v := &MsgMgo{ - coll: cli.Database("openim_v3").Collection("msg3"), - } - - req := &msg.SearchMessageReq{ - //RecvID: "3187706596", - //SendID: "7009965934", - ContentType: 101, - //SendTime: "2024-05-06", - //SessionType: 3, - Pagination: &sdkws.RequestPagination{ - PageNumber: 1, - ShowNumber: 10, - }, - } - total, res, err := v.SearchMessage(ctx, req) - if err != nil { - panic(err) - } - - for i, re := range res { - t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content) - } - - t.Log(total) + //ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + //defer cancel() + //cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + // + //v := &MsgMgo{ + // coll: cli.Database("openim_v3").Collection("msg3"), + //} + // + //req := &msg.SearchMessageReq{ + // //RecvID: "3187706596", + // //SendID: "7009965934", + // ContentType: 101, + // //SendTime: "2024-05-06", + // //SessionType: 3, + // Pagination: &sdkws.RequestPagination{ + // PageNumber: 1, + // ShowNumber: 10, + // }, + //} + //total, res, err := v.SearchMessage(ctx, req) + //if err != nil { + // panic(err) + //} + // + //for i, re := range res { + // t.Logf("%d => %d | %+v", i+1, re.Msg.Seq, re.Msg.Content) + //} + // + //t.Log(total) + // + //msg, err := NewMsgMongo(cli.Database("openim_v3")) + //if err != nil { + // panic(err) + //} + //res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) + //if err != nil { + // panic(err) + //} + //t.Log(len(res)) } func TestName10(t *testing.T) { @@ -73,3 +82,33 @@ func TestName10(t *testing.T) { } } + +func TestName3(t *testing.T) { + t.Log(uint64(math.MaxUint64)) + t.Log(int64(math.MaxInt64)) + + t.Log(int64(math.MinInt64)) +} + +func TestName4(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + defer cancel() + cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + + msg, err := NewMsgMongo(cli.Database("openim_v3")) + if err != nil { + panic(err) + } + ts := time.Now().Add(-time.Hour * 24 * 5).UnixMilli() + t.Log(ts) + res, err := msg.GetLastMessageSeqByTime(ctx, "sg_1523453548", ts) + if err != nil { + panic(err) + } + t.Log(res) +} + +func TestName5(t *testing.T) { + var v time.Time + t.Log(v.UnixMilli()) +} diff --git a/pkg/common/storage/database/mgo/object.go b/pkg/common/storage/database/mgo/object.go index 5bc329d33b..624eb84a0d 100644 --- a/pkg/common/storage/database/mgo/object.go +++ b/pkg/common/storage/database/mgo/object.go @@ -22,7 +22,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/db/mongoutil" - "github.com/openimsdk/tools/db/pagination" "github.com/openimsdk/tools/errs" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -91,21 +90,25 @@ func (o *S3Mongo) Take(ctx context.Context, engine string, name string) (*model. return mongoutil.FindOne[*model.Object](ctx, o.coll, bson.M{"name": name, "engine": engine}) } -func (o *S3Mongo) Delete(ctx context.Context, engine string, name string) error { - return mongoutil.DeleteOne(ctx, o.coll, bson.M{"name": name, "engine": engine}) +func (o *S3Mongo) Delete(ctx context.Context, engine string, name []string) error { + if len(name) == 0 { + return nil + } + return mongoutil.DeleteOne(ctx, o.coll, bson.M{"engine": engine, "name": bson.M{"$in": name}}) } -// Find Expires object -func (o *S3Mongo) FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) { - return mongoutil.FindPage[*model.Object](ctx, o.coll, bson.M{ - "create_time": bson.M{"$lt": duration}, +func (o *S3Mongo) FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) { + opt := options.Find() + if count > 0 { + opt.SetLimit(count) + } + return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ + "engine": engine, + "create_time": bson.M{"$lt": expiration}, "group": bson.M{"$in": needDelType}, - }, pagination) + }, opt) } -// Find object by key -func (o *S3Mongo) FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) { - return mongoutil.Find[*model.Object](ctx, o.coll, bson.M{ - "key": key, - }) +func (o *S3Mongo) GetKeyCount(ctx context.Context, engine string, key string) (int64, error) { + return mongoutil.Count(ctx, o.coll, bson.M{"engine": engine, "key": key}) } diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index 23a99f5b96..abb2a44c2f 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -45,7 +45,9 @@ type Msg interface { DeleteDoc(ctx context.Context, docID string) error DeleteMsgByIndex(ctx context.Context, docID string, index []int) error - GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) + GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) - GetDocIDs(ctx context.Context) ([]string, error) + GetRandDocIDs(ctx context.Context, limit int) ([]string, error) + + GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) } diff --git a/pkg/common/storage/database/object.go b/pkg/common/storage/database/object.go index c741e39a60..5541a159b4 100644 --- a/pkg/common/storage/database/object.go +++ b/pkg/common/storage/database/object.go @@ -19,13 +19,12 @@ import ( "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/db/pagination" ) type ObjectInfo interface { SetObject(ctx context.Context, obj *model.Object) error Take(ctx context.Context, engine string, name string) (*model.Object, error) - Delete(ctx context.Context, engine string, name string) error - FindNeedDeleteObjectByDB(ctx context.Context, duration time.Time, needDelType []string, pagination pagination.Pagination) (total int64, objects []*model.Object, err error) - FindModelsByKey(ctx context.Context, key string) (objects []*model.Object, err error) + Delete(ctx context.Context, engine string, name []string) error + FindExpirationObject(ctx context.Context, engine string, expiration time.Time, needDelType []string, count int64) ([]*model.Object, error) + GetKeyCount(ctx context.Context, engine string, key string) (int64, error) }