Skip to content

Commit

Permalink
refactoring scheduled tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Dec 19, 2024
1 parent 705bc37 commit 8807597
Show file tree
Hide file tree
Showing 13 changed files with 247 additions and 95 deletions.
16 changes: 16 additions & 0 deletions internal/api/router.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package api

import (
"context"
"fmt"
"github.com/openimsdk/protocol/user"
"net/http"
"strings"

Expand Down Expand Up @@ -164,6 +166,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En
authRouterGroup.POST("/get_user_token", a.GetUserToken)
authRouterGroup.POST("/parse_token", a.ParseToken)
authRouterGroup.POST("/force_logout", a.ForceLogout)

}
// Third service
thirdGroup := r.Group("/third")
Expand Down Expand Up @@ -297,3 +300,16 @@ var Whitelist = []string{
"/auth/get_admin_token",
"/auth/parse_token",
}

func init() {

var uc user.UserClient
var g *gin.Engine
g.POST("/get_admin_token", New(uc, uc.AccountCheck))

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

Check failure on line 308 in internal/api/router.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

in call to New, type func(ctx "context".Context, in *"github.com/openimsdk/protocol/user".AccountCheckReq, opts ...grpc.CallOption) (*"github.com/openimsdk/protocol/user".AccountCheckResp, error) of uc.AccountCheck does not match inferred type func(c "github.com/openimsdk/protocol/user".UserClient, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error) for func(c C, ctx "context".Context, req *A, opts ...grpc.CallOption) (*B, error)

}

func New[A, B, C any](c C, fn func(c C, ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error)) func(c *gin.Context) {

return nil
}
29 changes: 29 additions & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,3 +763,32 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req *
}
return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil
}

func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) {

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp

Check failure on line 767 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgReq
conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
}
for _, conversation := range conversations {
if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 {
continue
}
rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime}

Check failure on line 776 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTimeReq

Check failure on line 776 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTimeReq

Check failure on line 776 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTimeReq
resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq)

Check failure on line 777 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTime

Check failure on line 777 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTime

Check failure on line 777 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbmsg.GetLastMessageSeqByTime
if err != nil {
return nil, err
}
if resp.Seq == 0 {
continue
}
_, err = c.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{
ConversationID: conversation.ConversationID,
OwnerUserID: []string{conversation.OwnerUserID},
MinSeq: resp.Seq + 1,
})
if err != nil {
return nil, err
}
}
return &pbconversation.ClearUserConversationMsgResp{}, nil

Check failure on line 793 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Benchmark Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp

Check failure on line 793 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.21.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp

Check failure on line 793 in internal/rpc/conversation/conversation.go

View workflow job for this annotation

GitHub Actions / Test with go 1.22.x on ubuntu-latest

undefined: pbconversation.ClearUserConversationMsgResp
}
19 changes: 10 additions & 9 deletions internal/rpc/msg/clear.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
pbconv "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/protocol/wrapperspb"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/utils/datautil"
Expand All @@ -20,16 +19,10 @@ import (
)

// hard delete in Database.
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) {
func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) {
if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil {
return nil, err
}
if req.Timestamp > time.Now().UnixMilli() {
return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error")
}
if req.Limit <= 0 {
return nil, errs.ErrArgs.WrapMsg("request limit error")
}
docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit))
if err != nil {
return nil, err
Expand Down Expand Up @@ -66,7 +59,7 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq)
}

// soft delete for user self
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) {
func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) {
temp := convert.ConversationsPb2DB(req.Conversations)

batchNum := 100
Expand Down Expand Up @@ -128,3 +121,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.

return nil, nil
}

func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) {
seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time)
if err != nil {
return nil, err
}
return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil
}
2 changes: 1 addition & 1 deletion internal/tools/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (c *cronServer) deleteMsg() {
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli())
const (
deleteCount = 20
deleteCount = 200
deleteLimit = 50
)
var count int
Expand Down
58 changes: 51 additions & 7 deletions internal/tools/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,69 @@ import (

func (c *cronServer) clearS3() {
start := time.Now()
executeNum := 10
// number of pagination. if need modify, need update value in third.DeleteOutdatedData
pageShowNumber := 500
deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime))
operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli())
const (
deleteCount = 200
deleteLimit = 100
)

var count int
for i := 1; i <= executeNum; i++ {
ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i))
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)})
for i := 1; i <= deleteCount; i++ {
resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: deleteLimit})
if err != nil {
log.ZError(ctx, "cron deleteoutDatedData failed", err)
return
}
count += int(resp.Count)
if resp.Count < int32(pageShowNumber) {
if resp.Count < deleteLimit {
break
}
}
log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count)
}

// var req *third.DeleteOutdatedDataReq
// count1, err := ExtractField(ctx, c.thirdClient.DeleteOutdatedData, req, (*third.DeleteOutdatedDataResp).GetCount)
//
// c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
// msggateway.GetUsersOnlineStatusCaller.Invoke(ctx, &msggateway.GetUsersOnlineStatusReq{})
//
// var cli ThirdClient
//
// c111, err := cli.DeleteOutdatedData(ctx, 100)
//
// cli.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{})
//
// cli.AuthSign(ctx, &third.AuthSignReq{})
//
// cli.SetAppBadge()
//
//}
//
//func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) {
// resp, err := fn(ctx, req)
// if err != nil {
// var c C
// return c, err
// }
// return get(resp), nil
//}
//
//func ignore(_ any, err error) error {
// return err
//}
//
//type ThirdClient struct {
// third.ThirdClient
//}
//
//func (c *ThirdClient) DeleteOutdatedData(ctx context.Context, expireTime int64) (int32, error) {
// return extractField(ctx, c.ThirdClient.DeleteOutdatedData, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}, (*third.DeleteOutdatedDataResp).GetCount)
//}
//
//func (c *ThirdClient) DeleteOutdatedData1(ctx context.Context, expireTime int64) error {
// return ignore(c.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}))
//}
31 changes: 17 additions & 14 deletions internal/tools/user_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tools
import (
"fmt"
pbconversation "github.com/openimsdk/protocol/conversation"
"github.com/openimsdk/protocol/msg"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"os"
Expand All @@ -14,18 +13,22 @@ func (c *cronServer) clearUserMsg() {
now := time.Now()
operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli())
ctx := mcontext.SetOperationID(c.ctx, operationID)
log.ZDebug(ctx, "clear msg cron start")
conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{})
if err != nil {
log.ZError(ctx, "Get conversation need Destruct msgs failed.", err)
return
log.ZDebug(ctx, "clear user msg cron start")
const (
deleteCount = 200
deleteLimit = 100
)
var count int
for i := 1; i <= deleteCount; i++ {
resp, err := c.conversationClient.ClearUserConversationMsg(ctx, &pbconversation.ClearUserConversationMsgReq{Timestamp: now.UnixMilli(), Limit: deleteLimit})
if err != nil {
log.ZError(ctx, "ClearUserConversationMsg failed.", err)
return
}
count += int(resp.Count)
if resp.Count < deleteLimit {
break
}
}

_, err = c.msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations})
if err != nil {
log.ZError(ctx, "Clear Msg failed.", err)
return
}

log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now))
log.ZDebug(ctx, "clear user msg cron task completed", "cont", time.Since(now), "count", count)
}
6 changes: 6 additions & 0 deletions pkg/common/storage/controller/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type ConversationDatabase interface {
GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error)
// GetPinnedConversationIDs gets pinned conversationIDs by userID
GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error)
// FindRandConversation finds random conversations based on the specified timestamp and limit.
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error)
}

func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase {
Expand Down Expand Up @@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use
}
return conversationIDs, nil
}

func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) {
return c.conversationDB.FindRandConversation(ctx, ts, limit)
}
6 changes: 6 additions & 0 deletions pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type CommonMsgDatabase interface {
SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error

DeleteDoc(ctx context.Context, docID string) error

GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error)
}

func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) {
Expand Down Expand Up @@ -1016,3 +1018,7 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio
func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error {
return db.msgDocDatabase.DeleteDoc(ctx, docID)
}

func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) {
return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time)
}
1 change: 1 addition & 0 deletions pkg/common/storage/database/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ type Conversation interface {
GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error)
GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error)
FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error)
FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error)
}
32 changes: 32 additions & 0 deletions pkg/common/storage/database/mgo/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,35 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co
func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) {
return c.version.FindChangeLog(ctx, userID, version, limit)
}

func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) {
pipeline := []bson.M{
{
"$match": bson.M{
"is_msg_destruct": true,
"msg_destruct_time": bson.M{"$ne": 0},
},
},
{
"$addFields": bson.M{
"next_msg_destruct_timestamp": bson.M{
"$add": []any{
bson.M{
"$toLong": "$latest_msg_destruct_time",
}, "$msg_destruct_time"},
},
},
},
{
"$match": bson.M{
"next_msg_destruct_timestamp": bson.M{"$lt": ts},
},
},
{
"$sample": bson.M{
"size": limit,
},
},
}
return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline)
}
Loading

0 comments on commit 8807597

Please sign in to comment.