Skip to content

Commit

Permalink
refactoring scheduled tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 18, 2024
1 parent 3753b81 commit f03cbec
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 200 deletions.
93 changes: 16 additions & 77 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,9 @@ import (
"strconv"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"go.mongodb.org/mongo-driver/mongo"

"github.com/google/uuid"
"github.com/openimsdk/open-im-server/v3/pkg/common/servererrs"
"github.com/openimsdk/protocol/sdkws"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/model"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
Expand Down Expand Up @@ -288,87 +284,30 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
}

func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third
engine := t.config.RpcConfig.Object.Enable
expireTime := time.UnixMilli(req.ExpireTime)

findPagination := &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 500,
}

// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}

if total == 0 {
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}

needDelObjectKeys := make([]string, len(models))
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Count))

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)

Check failure on line 290 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Count undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Count)
if err != nil {
return nil, err
}

// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)

for _, key := range needDelObjectKeys {
// Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
if len(models) > 0 {
names := datautil.Batch(func(o *model.Object) string {
return o.Name
}, models)
if err := t.s3dataBase.DeleteSpecifiedData(ctx, engine, names); err != nil {
return nil, errs.Wrap(err)
}

// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, keymodel := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
needDelKey = false
break
}
if err := t.s3dataBase.DelS3Key(ctx, engine, names...); err != nil {
return nil, err
}

// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
// If have a thumbnail, delete it
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
if thumbnailKey != "" {
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
if err != nil {
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
}
}

// Delete object
err = t.s3dataBase.DeleteObject(ctx, key)
if err != nil {
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
}

// Delete cache key
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
if err != nil {
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
for _, object := range models {
if err := t.s3.DeleteObject(ctx, object.Key); err != nil {
return nil, err
}
}
}

// handle delete data in S3 database
for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
if err != nil {
return nil, errs.Wrap(err)
}
}

log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)

return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
return &third.DeleteOutdatedDataResp{Count: int32(len(models))}, nil
}

type FormDataMate struct {
Expand Down
14 changes: 4 additions & 10 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type thirdServer struct {
s3dataBase controller.S3Database
defaultExpire time.Duration
config *Config
minio *minio.Minio
s3 s3.Interface
}

type Config struct {
Expand Down Expand Up @@ -79,13 +79,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
// Select the oss method according to the profile policy
enable := config.RpcConfig.Object.Enable
var (
o s3.Interface
minioCli *minio.Minio
o s3.Interface
)
switch enable {
case "minio":
minioCli, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
o = minioCli
o, err = minio.NewMinio(ctx, redis.NewMinioCache(rdb), *config.MinioConfig.Build())
case "cos":
o, err = cos.NewCos(*config.RpcConfig.Object.Cos.Build())
case "oss":
Expand All @@ -106,15 +104,11 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg
s3dataBase: controller.NewS3Database(rdb, o, s3db),
defaultExpire: time.Hour * 24 * 7,
config: config,
minio: minioCli,
s3: o,
})
return nil
}

func (t *thirdServer) getMinioImageThumbnailKey(ctx context.Context, name string) (string, error) {
return t.minio.GetImageThumbnailKey(ctx, name)
}

func (t *thirdServer) FcmUpdateToken(ctx context.Context, req *third.FcmUpdateTokenReq) (resp *third.FcmUpdateTokenResp, err error) {
err = t.thirdDatabase.FcmUpdateToken(ctx, req.Account, int(req.PlatformID), req.FcmToken, req.ExpireTime)
if err != nil {
Expand Down
125 changes: 46 additions & 79 deletions internal/tools/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package tools

import (
"context"
"fmt"
"os"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
pbconversation "github.com/openimsdk/protocol/conversation"
Expand Down Expand Up @@ -60,87 +56,58 @@ func Start(ctx context.Context, config *CronTaskConfig) error {
return err
}

msgClient := msg.NewMsgClient(msgConn)
conversationClient := pbconversation.NewConversationClient(conversationConn)
thirdClient := third.NewThirdClient(thirdConn)

crontab := cron.New()

// scheduled hard delete outdated Msgs in specific time.
destructMsgsFunc := func() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())

if _, err := msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, destructMsgsFunc); err != nil {
return errs.Wrap(err)
srv := &cronServer{
ctx: ctx,
config: config,
cron: cron.New(),
msgClient: msg.NewMsgClient(msgConn),
conversationClient: pbconversation.NewConversationClient(conversationConn),
thirdClient: third.NewThirdClient(thirdConn),
}

// scheduled soft delete outdated Msgs in specific time when user set `is_msg_destruct` feature.
clearMsgFunc := func() {
now := time.Now()
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "clear msg cron start", "now", now)

conversations, err := conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return
}

_, err = msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Clear Msg failed.", err)
return
}

log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, clearMsgFunc); err != nil {
return errs.Wrap(err)
if err := srv.registerClearS3(); err != nil {
return err
}

// scheduled delete outdated file Objects and their datas in specific time.
deleteObjectFunc := func() {
now := time.Now()
executeNum := 5
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
pageShowNumber := 500
deleteTime := now.Add(-time.Hour * 24 * time.Duration(config.CronTask.FileExpireTime))
ctx := mcontext.SetOperationID(ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli()))
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())

if len(config.CronTask.DeleteObjectType) == 0 {
log.ZDebug(ctx, "cron deleteoutDatedData not type need delete", "deletetime", deleteTime, "DeleteObjectType", config.CronTask.DeleteObjectType, "cont", time.Since(now))
return
}

for i := 0; i < executeNum; i++ {
resp, err := thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: config.CronTask.DeleteObjectType})
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(now))
return
}
if resp.Count == 0 || resp.Count < int32(pageShowNumber) {
break
}
}

log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(now))
if err := srv.registerDeleteMsg(); err != nil {
return err
}
if _, err := crontab.AddFunc(config.CronTask.CronExecuteTime, deleteObjectFunc); err != nil {
return errs.Wrap(err)
if err := srv.registerClearUserMsg(); err != nil {
return err
}

log.ZDebug(ctx, "start cron task", "CronExecuteTime", config.CronTask.CronExecuteTime)
crontab.Start()
srv.cron.Start()
<-ctx.Done()
return nil
}

type cronServer struct {
ctx context.Context
config *CronTaskConfig
cron *cron.Cron
msgClient msg.MsgClient
conversationClient pbconversation.ConversationClient
thirdClient third.ThirdClient
}

func (c *cronServer) registerClearS3() error {
if c.config.CronTask.FileExpireTime <= 0 || len(c.config.CronTask.DeleteObjectType) == 0 {
log.ZInfo(c.ctx, "disable scheduled cleanup of s3", "fileExpireTime", c.config.CronTask.FileExpireTime, "deleteObjectType", c.config.CronTask.DeleteObjectType)
return nil
}
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearS3)
return errs.WrapMsg(err, "failed to register clear s3 cron task")
}

func (c *cronServer) registerDeleteMsg() error {
if c.config.CronTask.RetainChatRecords <= 0 {
log.ZInfo(c.ctx, "disable scheduled cleanup of chat records", "retainChatRecords", c.config.CronTask.RetainChatRecords)
return nil
}
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.deleteMsg)
return errs.WrapMsg(err, "failed to register delete msg cron task")
}

func (c *cronServer) registerClearUserMsg() error {
_, err := c.cron.AddFunc(c.config.CronTask.CronExecuteTime, c.clearUserMsg)
return errs.WrapMsg(err, "failed to register clear user msg cron task")
}
23 changes: 23 additions & 0 deletions internal/tools/msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tools

import (
"fmt"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)

func (c *cronServer) deleteMsg() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())

if _, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
}
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
}
34 changes: 34 additions & 0 deletions internal/tools/s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package tools

import (
"fmt"
"github.com/openimsdk/protocol/third"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)

func (c *cronServer) clearS3() {
start := time.Now()
executeNum := 10
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
pageShowNumber := 500
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
operationID := fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
for i := 1; i <= executeNum; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Count: int32(pageShowNumber)})

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq

Check failure on line 23 in internal/tools/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type third.DeleteOutdatedDataReq
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(start))
return
}
if resp.Count < int32(pageShowNumber) {
break
}
}

log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start))
}
31 changes: 31 additions & 0 deletions internal/tools/user_msg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package tools

import (
"fmt"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
"time"
)

func (c *cronServer) clearUserMsg() {
now := time.Now()
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "clear msg cron start", "now", now)

conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return
}

_, err = c.msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Clear Msg failed.", err)
return
}

log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
}
Loading

0 comments on commit f03cbec

Please sign in to comment.