Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Optimize Scheduled Task #2985

Merged
merged 16 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/mitchellh/mapstructure v1.5.0
github.com/openimsdk/protocol v0.0.72-alpha.66
github.com/openimsdk/tools v0.0.50-alpha.57
github.com/openimsdk/protocol v0.0.72-alpha.67
github.com/openimsdk/tools v0.0.50-alpha.58
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.18.0
github.com/stretchr/testify v1.9.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,10 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM=
github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.66 h1:5KoDY6M4T+pXg449ScF6hqeQ+WenBwNyUJn/t8W0oBQ=
github.com/openimsdk/protocol v0.0.72-alpha.66/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.57 h1:oIKV6vYhqp7TRmZ6Pe+r9RNl1D5s7aB/kE9yQVEWcSY=
github.com/openimsdk/tools v0.0.50-alpha.57/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/openimsdk/protocol v0.0.72-alpha.67 h1:zlLbVkoT0OYsjO2RCutQuDFllcfNvZfdYchvlR6UIe0=
github.com/openimsdk/protocol v0.0.72-alpha.67/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M=
github.com/openimsdk/tools v0.0.50-alpha.58 h1:hkFL02Bzzp/l5x+tb7kJ9zes7hilh65EQ4qEIthsQX4=
github.com/openimsdk/tools v0.0.50-alpha.58/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0=
github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM=
github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
Expand Down
1 change: 1 addition & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
authRouterGroup.POST("/get_user_token", a.GetUserToken)
authRouterGroup.POST("/parse_token", a.ParseToken)
authRouterGroup.POST("/force_logout", a.ForceLogout)

}
// Third service
thirdGroup := r.Group("/third")
Expand Down
50 changes: 50 additions & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,53 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *
}
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
}

func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) {
conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
}
latestMsgDestructTime := time.UnixMilli(req.Timestamp)
for i, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)
if err != nil {
return nil, err
}
if resp.Seq <= 0 {
log.ZDebug(ctx, "ClearUserConversationMsg GetLastMessageSeqByTime seq <= 0", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "msgDestructTime", conversation.MsgDestructTime, "seq", resp.Seq)
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, -1, latestMsgDestructTime); err != nil {
return nil, err
}
continue
}
resp.Seq++
if err := c.setConversationMinSeqAndLatestMsgDestructTime(ctx, conversation.ConversationID, conversation.OwnerUserID, resp.Seq, latestMsgDestructTime); err != nil {
return nil, err
}
log.ZDebug(ctx, "ClearUserConversationMsg set min seq", "index", i, "conversationID", conversation.ConversationID, "ownerUserID", conversation.OwnerUserID, "seq", resp.Seq, "msgDestructTime", conversation.MsgDestructTime)
}
return &pbconversation.ClearUserConversationMsgResp{Count: int32(len(conversations))}, nil
}

func (c *conversationServer) setConversationMinSeqAndLatestMsgDestructTime(ctx context.Context, conversationID string, ownerUserID string, minSeq int64, latestMsgDestructTime time.Time) error {
update := map[string]any{
"latest_msg_destruct_time": latestMsgDestructTime,
}
if minSeq >= 0 {
req := &pbmsg.SetUserConversationMinSeqReq{ConversationID: conversationID, OwnerUserID: []string{ownerUserID}, MinSeq: minSeq}
if _, err := pbmsg.SetUserConversationMinSeqCaller.Invoke(ctx, req); err != nil {
return err
}
update["min_seq"] = minSeq
}

if err := c.conversationDatabase.UpdateUsersConversationField(ctx, []string{ownerUserID}, conversationID, update); err != nil {
return err
}
c.conversationNotificationSender.ConversationChangeNotification(ctx, ownerUserID, []string{conversationID})
return nil
}
85 changes: 40 additions & 45 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package msg

import (
"context"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/convert"
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
Expand All @@ -19,63 +19,50 @@ import (
)

// hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
}
var (
docNum int
msgNum int
start = time.Now()
getLimit = 5000
)

destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}

msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil {
return false, err
for i, doc := range docs {
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
return nil, err
}
if len(msgs) == 0 {
return false, nil
log.ZDebug(ctx, "DestructMsgs delete doc", "index", i, "docID", doc.DocID)
index := strings.LastIndex(doc.DocID, ":")
if index < 0 {
continue
}

for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
var minSeq int64
for _, model := range doc.Msg {
if model.Msg == nil {
continue
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
if model.Msg.Seq > minSeq {
minSeq = model.Msg.Seq
}

docNum++
msgNum += len(index)
}

return true, nil
}

_, err = destructMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
if minSeq <= 0 {
continue
}
conversationID := doc.DocID[:index]
if conversationID == "" {
continue
}
minSeq++
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
return nil, err
}
log.ZDebug(ctx, "DestructMsgs delete doc set min seq", "index", i, "docID", doc.DocID, "conversationID", conversationID, "setMinSeq", minSeq)
}

log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))

return &msg.DestructMsgsResp{}, nil
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil
}

// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100
Expand Down Expand Up @@ -137,3 +124,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.

return nil, nil
}

func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil {
return nil, err
}
return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil
}
99 changes: 22 additions & 77 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"

"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -288,87 +285,35 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
}

func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime)

findPagination := &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 500,
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}

engine := t.config.RpcConfig.Object.Enable
expireTime := time.UnixMilli(req.ExpireTime)
// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}

if total == 0 {
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}

needDelObjectKeys := make([]string, len(models))
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit))
if err != nil {
return nil, err
}

// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)

for _, key := range needDelObjectKeys {
// Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
for i, obj := range models {
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, []string{obj.Name}); err != nil {
return nil, errs.Wrap(err)
}

// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, keymodel := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
needDelKey = false
break
}
}

// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
// If have a thumbnail, delete it
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
if thumbnailKey != "" {
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
if err != nil {
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
}
}

// Delete object
err = t.s3dataBase.DeleteObject(ctx, key)
if err != nil {
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
}

// Delete cache key
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
if err != nil {
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
}
if err := t.s3dataBase.DelS3Key(ctx, engine, obj.Name); err != nil {
return nil, err
}
}

// handle delete data in S3 database
for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
count, err := t.s3dataBase.GetKeyCount(ctx, engine, obj.Key)
if err != nil {
return nil, errs.Wrap(err)
return nil, err
}
log.ZDebug(ctx, "delete s3 object record", "index", i, "s3", obj, "count", count)
if count == 0 {
if err := t.s3.DeleteObject(ctx, obj.Key); err != nil {
return nil, err
}
}
}

log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)

return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
return &third.DeleteOutdatedDataResp{Count: int32(len(models))}, nil
}

type FormDataMate struct {
Expand Down
14 changes: 4 additions & 10 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type thirdServer struct {
s3dataBase controller.S3Database
defaultExpire time.Duration
config *Config
minio *minio.Minio
s3 s3.Interface
}

type Config struct {
Expand Down Expand Up @@ -79,13 +79,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
// Select the oss method according to the profile policy
enable := config.RpcConfig.Object.Enable
var (
o s3.Interface
minioCli *minio.Minio
o s3.Interface
)
switch enable {
case "minio":
minioCli, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
o = minioCli
o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
case "cos":
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
case "oss":
Expand All @@ -106,15 +104,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,
minio: minioCli,
s3: o,
})
return nil
}

func (t *thirdServer) getMinioImageThumbnailKey(ctx context.Context, name string) (string, error) {
return t.minio.GetImageThumbnailKey(ctx, name)
}

func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil {
Expand Down
Loading
Loading