Skip to content

Commit

Permalink
feat: Optimize Scheduled Task (openimsdk#2985)
Browse files Browse the repository at this point in the history
* pb

* fix: Modifying other fields while setting IsPrivateChat does not take effect

* fix: quote message error revoke

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks

* refactoring scheduled tasks
  • Loading branch information
withchao authored Dec 20, 2024
1 parent 04f3c99 commit e989506
Show file tree
Hide file tree
Showing 22 changed files with 640 additions and 393 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.66
github.com/openimsdk/tools v0.0.50-alpha.57
github.com/openimsdk/protocol v0.0.72-alpha.67
github.com/openimsdk/tools v0.0.50-alpha.58
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.66 h1:5KoDY6M4T+pXg449ScF6hqeQ+WenBwNyUJn/t8W0oBQ=
github.com/openimsdk/protocol v0.0.72-alpha.66/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.57 h1:oIKV6vYhqp7TRmZ6Pe+r9RNl1D5s7aB/kE9yQVEWcSY=
github.com/openimsdk/tools v0.0.50-alpha.57/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0=
github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4=
github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
Expand Down
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
authRouterGroup.POST("/get_user_token", a.GetUserToken)
authRouterGroup.POST("/parse_token", a.ParseToken)
authRouterGroup.POST("/force_logout", a.ForceLogout)

}
// Third service
thirdGroup := r.Group("/third")
Expand Down
50 changes: 50 additions & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,53 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *
}
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
}

func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) {
conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
}
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
if err != nil {
return nil, err
}
if resp.Seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err
}
continue
}
resp.Seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
return nil, err
}
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
}
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
}

func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error {
update := map[string]any{
"latest_msg_destruct_time": latestMsgDestructTime,
}
if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
return err
}
update["min_seq"] = minSeq
}

if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil {
return err
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
85 changes: 40 additions & 45 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package msg

import (
"context"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
Expand All @@ -19,63 +19,50 @@ import (
)

// hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
}
var (
docNum int
msgNum int
start = time.Now()
getLimit = 5000
)

destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}

msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil {
return false, err
for i, doc := range docs {
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
return nil, err
}
if len(msgs) == 0 {
return false, nil
log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID)
index := strings.LastIndex(doc.DocID, ":")
if index < 0 {
continue
}

for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
var minSeq int64
for _, model := range doc.Msg {
if model.Msg == nil {
continue
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
if model.Msg.Seq > minSeq {
minSeq = model.Msg.Seq
}

docNum++
msgNum += len(index)
}

return true, nil
}

_, err = destructMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
if minSeq <= 0 {
continue
}
conversationID := doc.DocID[:index]
if conversationID == "" {
continue
}
minSeq++
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
return nil, err
}
log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq)
}

log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))

return &msg.DestructMsgsResp{}, nil
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}

// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100
Expand Down Expand Up @@ -137,3 +124,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.

return nil, nil
}

func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil {
return nil, err
}
return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil
}
99 changes: 22 additions & 77 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"

"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -288,87 +285,35 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
}

func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime)

findPagination := &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 500,
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}

engine := t.config.RpcConfig.Object.Enable
expireTime := time.UnixMilli(req.ExpireTime)
// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}

if total == 0 {
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}

needDelObjectKeys := make([]string, len(models))
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit))
if err != nil {
return nil, err
}

// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)

for _, key := range needDelObjectKeys {
// Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
for i, obj := range models {
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil {
return nil, errs.Wrap(err)
}

// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, keymodel := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
needDelKey = false
break
}
}

// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
// If have a thumbnail, delete it
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
if thumbnailKey != "" {
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
if err != nil {
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
}
}

// Delete object
err = t.s3dataBase.DeleteObject(ctx, key)
if err != nil {
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
}

// Delete cache key
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
if err != nil {
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
}
if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil {
return nil, err
}
}

// handle delete data in S3 database
for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
if err != nil {
return nil, errs.Wrap(err)
return nil, err
}
log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count)
if count == 0 {
if err := t.s3.DeleteObject(ctx, obj.Key); err != nil {
return nil, err
}
}
}

log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)

return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
return &third.DeleteOutdatedDataResp{Count: int32(len(models))}, nil
}

type FormDataMate struct {
Expand Down
14 changes: 4 additions & 10 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type thirdServer struct {
s3dataBase controller.S3Database
defaultExpire time.Duration
config *Config
minio *minio.Minio
s3 s3.Interface
}

type Config struct {
Expand Down Expand Up @@ -79,13 +79,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
// Select the oss method according to the profile policy
enable := config.RpcConfig.Object.Enable
var (
o s3.Interface
minioCli *minio.Minio
o s3.Interface
)
switch enable {
case "minio":
minioCli, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
o = minioCli
o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
case "cos":
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
case "oss":
Expand All @@ -106,15 +104,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,
minio: minioCli,
s3: o,
})
return nil
}

func (t *thirdServer) getMinioImageThumbnailKey(ctx context.Context, name string) (string, error) {
return t.minio.GetImageThumbnailKey(ctx, name)
}

func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil {
Expand Down
Loading

0 comments on commit e989506

Please sign in to comment.