Skip to content

Commit

Permalink
refactoring scheduled tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 19, 2024
1 parent f03cbec commit 705bc37
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 146 deletions.
71 changes: 31 additions & 40 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package msg

import (
"context"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/authverify"
Expand All @@ -26,52 +27,42 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
var (
docNum int
msgNum int
start = time.Now()
getLimit = 5000
)

destructMsg := func(ctx context.Context) (bool, error) {
docIDs, err := m.MsgDatabase.GetDocIDs(ctx)
if err != nil {
return false, err
}

msgs, err := m.MsgDatabase.GetBeforeMsg(ctx, req.Timestamp, docIDs, getLimit)
if err != nil {
return false, err
if req.Limit <= 0 {

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 30 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)
return nil, errs.ErrArgs.WrapMsg("request limit error")
}
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)

Check failure on line 33 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/msg".DestructMsgsReq has no field or method Limit)
if err != nil {
return nil, err
}
for _, doc := range docs {
if err := m.MsgDatabase.DeleteDoc(ctx, doc.DocID); err != nil {
return nil, err
}
if len(msgs) == 0 {
return false, nil
index := strings.LastIndex(doc.DocID, ":")
if index < 0 {
continue
}

for _, msg := range msgs {
index, err := m.MsgDatabase.DeleteDocMsgBefore(ctx, req.Timestamp, msg)
if err != nil {
return false, err
var minSeq int64
for _, model := range doc.Msg {
if model.Msg == nil {
continue
}
if len(index) == 0 {
return false, errs.ErrInternalServer.WrapMsg("delete doc msg failed")
if model.Msg.Seq > minSeq {
minSeq = model.Msg.Seq
}

docNum++
msgNum += len(index)
}

return true, nil
}

_, err = destructMsg(ctx)
if err != nil {
log.ZError(ctx, "clear msg failed", err, "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))
return nil, err
if minSeq <= 0 {
continue
}
conversationID := doc.DocID[:index]
if conversationID == "" {
continue
}
if err := m.MsgDatabase.SetMinSeq(ctx, conversationID, minSeq); err != nil {
return nil, err
}
}

log.ZDebug(ctx, "clearing message", "docNum", docNum, "msgNum", msgNum, "cost", time.Since(start))

return &msg.DestructMsgsResp{}, nil
return &msg.DestructMsgsResp{Count: int32(len(docs))}, nil

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp

Check failure on line 65 in internal/rpc/msg/clear.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

unknown field Count in struct literal of type "github.com/openimsdk/protocol/msg".DestructMsgsResp
}

// soft delete for user self
Expand Down
6 changes: 5 additions & 1 deletion internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/base64"
"encoding/hex"
"encoding/json"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"path"
"strconv"
"time"
Expand Down Expand Up @@ -284,10 +285,13 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
}

func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
if err := authverify.CheckAdmin(ctx, t.config.Share.IMAdminUserID); err != nil {
return nil, err
}
engine := t.config.RpcConfig.Object.Enable
expireTime := time.UnixMilli(req.ExpireTime)
// Find all expired data in S3 database
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Count))
models, err := t.s3dataBase.FindExpirationObject(ctx, engine, expireTime, req.ObjectGroup, int64(req.Limit))

Check failure on line 294 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Limit)

Check failure on line 294 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Limit)

Check failure on line 294 in internal/rpc/third/s3.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

req.Limit undefined (type *"github.com/openimsdk/protocol/third".DeleteOutdatedDataReq has no field or method Limit)
if err != nil {
return nil, err
}
Expand Down
25 changes: 19 additions & 6 deletions internal/tools/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,25 @@ import (
func (c *cronServer) deleteMsg() {
now := time.Now()
deltime := now.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.RetainChatRecords))
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), deltime.UnixMilli()))
operationID := fmt.Sprintf("cron_msg_%d_%d", os.Getpid(), deltime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())

if _, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli()}); err != nil {
log.ZError(ctx, "cron destruct chat records failed", err, "deltime", deltime, "cont", time.Since(now))
return
const (
deleteCount = 20
deleteLimit = 50
)
var count int
for i := 1; i <= deleteCount; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.msgClient.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: deltime.UnixMilli(), Limit: deleteLimit})
if err != nil {
log.ZError(ctx, "cron destruct chat records failed", err)
break
}
count += int(resp.Count)
if resp.Count <= deleteLimit {
break
}
}
log.ZDebug(ctx, "cron destruct chat records success", "deltime", deltime, "cont", time.Since(now))
log.ZDebug(ctx, "cron destruct chat records end", "deltime", deltime, "cont", time.Since(now), "deleteDocs", count)
}
11 changes: 6 additions & 5 deletions internal/tools/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,21 @@ func (c *cronServer) clearS3() {
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
pageShowNumber := 500
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
operationID := fmt.Sprintf("cron_%d_%d", os.Getpid(), deleteTime.UnixMilli())
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
var count int
for i := 1; i <= executeNum; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Count: int32(pageShowNumber)})
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)})
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err, "deleteTime", deleteTime, "cont", time.Since(start))
log.ZError(ctx, "cron deleteoutDatedData failed", err)
return
}
count += int(resp.Count)
if resp.Count < int32(pageShowNumber) {
break
}
}

log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start))
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
}
6 changes: 3 additions & 3 deletions internal/tools/user_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (

func (c *cronServer) clearUserMsg() {
now := time.Now()
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("cron_%d_%d", os.Getpid(), now.UnixMilli()))
log.ZDebug(ctx, "clear msg cron start", "now", now)

operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear msg cron start")
conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
Expand Down
104 changes: 47 additions & 57 deletions pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"errors"
"strings"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/storage/database"
Expand Down Expand Up @@ -69,6 +68,7 @@ type CommonMsgDatabase interface {
GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error)
GetMaxSeq(ctx context.Context, conversationID string) (int64, error)
SetMinSeqs(ctx context.Context, seqs map[string]int64) error
SetMinSeq(ctx context.Context, conversationID string, seq int64) error

SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
Expand All @@ -95,13 +95,14 @@ type CommonMsgDatabase interface {
ConvertMsgsDocLen(ctx context.Context, conversationIDs []string)

// get Msg when destruct msg before
GetBeforeMsg(ctx context.Context, ts int64, docIds []string, limit int) ([]*model.MsgDocModel, error)
DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)
//DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error)

GetDocIDs(ctx context.Context) ([]string, error)
GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error)

SetUserConversationsMaxSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error

DeleteDoc(ctx context.Context, docID string) error
}

func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
Expand Down Expand Up @@ -806,9 +807,10 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin
return db.seqConversation.GetMaxSeq(ctx, conversationID)
}

func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
}
//
//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq)
//}

func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error {
return db.seqConversation.SetMinSeqs(ctx, seqs)
Expand Down Expand Up @@ -947,56 +949,40 @@ func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversation
db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs)
}

func (db *commonMsgDatabase) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
var msgs []*model.MsgDocModel
for i := 0; i < len(docIDs); i += 1000 {
end := i + 1000
if end > len(docIDs) {
end = len(docIDs)
}

res, err := db.msgDocDatabase.GetBeforeMsg(ctx, ts, docIDs[i:end], limit)
if err != nil {
return nil, err
}
msgs = append(msgs, res...)

if len(msgs) >= limit {
return msgs[:limit], nil
}
}
return msgs, nil
}

func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
var notNull int
index := make([]int, 0, len(doc.Msg))
for i, message := range doc.Msg {
if message.Msg != nil {
notNull++
if message.Msg.SendTime < ts {
index = append(index, i)
}
}
}
if len(index) == 0 {
return index, nil
}
maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
if err := db.setMinSeq(ctx, conversationID, maxSeq+1); err != nil {
return index, err
}
if len(index) == notNull {
log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
} else {
log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
}
func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit)
}

func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID string, seq int64) error {
//
//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) {
// var notNull int
// index := make([]int, 0, len(doc.Msg))
// for i, message := range doc.Msg {
// if message.Msg != nil {
// notNull++
// if message.Msg.SendTime < ts {
// index = append(index, i)
// }
// }
// }
// if len(index) == 0 {
// return index, nil
// }
// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq
// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")]
// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil {
// return index, err
// }
// if len(index) == notNull {
// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID)
// } else {
// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq)
// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index)
// }
//}

func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error {
dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID)
if err != nil {
if errors.Is(errs.Unwrap(err), redis.Nil) {
Expand All @@ -1010,8 +996,8 @@ func (db *commonMsgDatabase) setMinSeq(ctx context.Context, conversationID strin
return db.seqConversation.SetMinSeq(ctx, conversationID, seq)
}

func (db *commonMsgDatabase) GetDocIDs(ctx context.Context) ([]string, error) {
return db.msgDocDatabase.GetDocIDs(ctx)
func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
return db.msgDocDatabase.GetRandDocIDs(ctx, limit)
}

func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) {
Expand All @@ -1026,3 +1012,7 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
// todo: only the time in the redis cache will be taken, not the message time
return db.seqConversation.GetMaxSeqsWithTime(ctx, conversationIDs)
}

func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
return db.msgDocDatabase.DeleteDoc(ctx, docID)
}
20 changes: 11 additions & 9 deletions pkg/common/storage/database/mgo/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,8 +1227,7 @@ func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string
}
}

func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) {
limit := 5000
func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) {
var skip int
var docIDs []string
var offset int
Expand Down Expand Up @@ -1267,15 +1266,18 @@ func (m *MsgMgo) GetDocIDs(ctx context.Context) ([]string, error) {
return docIDs, errs.Wrap(err)
}

func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, limit int) ([]*model.MsgDocModel, error) {
func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) {
return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{
{
"$match": bson.M{
"doc_id": bson.M{
"$in": docIDs,
},
"msgs.msg.send_time": bson.M{
"$lt": ts,
"msgs": bson.M{
"$not": bson.M{
"$elemMatch": bson.M{
"msg.send_time": bson.M{
"$gt": ts,
},
},
},
},
},
},
Expand All @@ -1288,7 +1290,7 @@ func (m *MsgMgo) GetBeforeMsg(ctx context.Context, ts int64, docIDs []string, li
},
},
{
"$limit": limit,
"$sample": limit,
},
})
}
Expand Down
Loading

0 comments on commit 705bc37

Please sign in to comment.