diff --git a/internal/api/router.go b/internal/api/router.go index d6bf4e130b..33f16ade32 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -1,7 +1,9 @@ package api import ( + "context" "fmt" + "github.com/openimsdk/protocol/user" "net/http" "strings" @@ -164,6 +166,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En authRouterGroup.POST("/get_user_token", a.GetUserToken) authRouterGroup.POST("/parse_token", a.ParseToken) authRouterGroup.POST("/force_logout", a.ForceLogout) + } // Third service thirdGroup := r.Group("/third") @@ -297,3 +300,16 @@ var Whitelist = []string{ "/auth/get_admin_token", "/auth/parse_token", } + +func init() { + + var uc user.UserClient + var g *gin.Engine + g.POST("/get_admin_token", New(uc, uc.AccountCheck)) + +} + +func New[A, B, C any](c C, fn func(c C, ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error)) func(c *gin.Context) { + + return nil +} diff --git a/internal/rpc/conversation/conversation.go b/internal/rpc/conversation/conversation.go index f534ca6dea..9fcd5abb1d 100644 --- a/internal/rpc/conversation/conversation.go +++ b/internal/rpc/conversation/conversation.go @@ -763,3 +763,32 @@ func (c *conversationServer) GetPinnedConversationIDs(ctx context.Context, req * } return &pbconversation.GetPinnedConversationIDsResp{ConversationIDs: conversationIDs}, nil } + +func (c *conversationServer) ClearUserConversationMsg(ctx context.Context, req *pbconversation.ClearUserConversationMsgReq) (*pbconversation.ClearUserConversationMsgResp, error) { + conversations, err := c.conversationDatabase.FindRandConversation(ctx, req.Timestamp, int(req.Limit)) + if err != nil { + return nil, err + } + for _, conversation := range conversations { + if conversation.IsMsgDestruct == false || conversation.MsgDestructTime == 0 { + continue + } + rcpReq := &pbmsg.GetLastMessageSeqByTimeReq{ConversationID: conversation.ConversationID, Time: req.Timestamp - conversation.MsgDestructTime} + resp, err := pbmsg.GetLastMessageSeqByTime.Invoke(ctx, rcpReq) + if err != nil { + return nil, err + } + if resp.Seq == 0 { + continue + } + _, err = c.SetConversationMinSeq(ctx, &pbconversation.SetConversationMinSeqReq{ + ConversationID: conversation.ConversationID, + OwnerUserID: []string{conversation.OwnerUserID}, + MinSeq: resp.Seq + 1, + }) + if err != nil { + return nil, err + } + } + return &pbconversation.ClearUserConversationMsgResp{}, nil +} diff --git a/internal/rpc/msg/clear.go b/internal/rpc/msg/clear.go index 8d4bd0c8ee..eb7a4b7a60 100644 --- a/internal/rpc/msg/clear.go +++ b/internal/rpc/msg/clear.go @@ -10,7 +10,6 @@ import ( pbconv "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/wrapperspb" - "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/utils/datautil" @@ -20,16 +19,10 @@ import ( ) // hard delete in Database. -func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (_ *msg.DestructMsgsResp, err error) { +func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) (*msg.DestructMsgsResp, error) { if err := authverify.CheckAdmin(ctx, m.config.Share.IMAdminUserID); err != nil { return nil, err } - if req.Timestamp > time.Now().UnixMilli() { - return nil, errs.ErrArgs.WrapMsg("request millisecond timestamp error") - } - if req.Limit <= 0 { - return nil, errs.ErrArgs.WrapMsg("request limit error") - } docs, err := m.MsgDatabase.GetRandBeforeMsg(ctx, req.Timestamp, int(req.Limit)) if err != nil { return nil, err @@ -66,7 +59,7 @@ func (m *msgServer) DestructMsgs(ctx context.Context, req *msg.DestructMsgsReq) } // soft delete for user self -func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg.ClearMsgResp, err error) { +func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (*msg.ClearMsgResp, error) { temp := convert.ConversationsPb2DB(req.Conversations) batchNum := 100 @@ -128,3 +121,11 @@ func (m *msgServer) ClearMsg(ctx context.Context, req *msg.ClearMsgReq) (_ *msg. return nil, nil } + +func (m *msgServer) GetLastMessageSeqByTime(ctx context.Context, req *msg.GetLastMessageSeqByTimeReq) (*msg.GetLastMessageSeqByTimeResp, error) { + seq, err := m.MsgDatabase.GetLastMessageSeqByTime(ctx, req.ConversationID, req.Time) + if err != nil { + return nil, err + } + return &msg.GetLastMessageSeqByTimeResp{Seq: seq}, nil +} diff --git a/internal/tools/msg.go b/internal/tools/msg.go index 46f1573f28..5226496450 100644 --- a/internal/tools/msg.go +++ b/internal/tools/msg.go @@ -16,7 +16,7 @@ func (c *cronServer) deleteMsg() { ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "Destruct chat records", "deltime", deltime, "timestamp", deltime.UnixMilli()) const ( - deleteCount = 20 + deleteCount = 200 deleteLimit = 50 ) var count int diff --git a/internal/tools/s3.go b/internal/tools/s3.go index 3b0122c84e..469b0d0cb3 100644 --- a/internal/tools/s3.go +++ b/internal/tools/s3.go @@ -11,25 +11,69 @@ import ( func (c *cronServer) clearS3() { start := time.Now() - executeNum := 10 - // number of pagination. if need modify, need update value in third.DeleteOutdatedData - pageShowNumber := 500 deleteTime := start.Add(-time.Hour * 24 * time.Duration(c.config.CronTask.FileExpireTime)) operationID := fmt.Sprintf("cron_s3_%d_%d", os.Getpid(), deleteTime.UnixMilli()) ctx := mcontext.SetOperationID(c.ctx, operationID) log.ZDebug(ctx, "deleteoutDatedData", "deletetime", deleteTime, "timestamp", deleteTime.UnixMilli()) + const ( + deleteCount = 200 + deleteLimit = 100 + ) + var count int - for i := 1; i <= executeNum; i++ { - ctx := mcontext.SetOperationID(c.ctx, fmt.Sprintf("%s_%d", operationID, i)) - resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: int32(pageShowNumber)}) + for i := 1; i <= deleteCount; i++ { + resp, err := c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: deleteTime.UnixMilli(), ObjectGroup: c.config.CronTask.DeleteObjectType, Limit: deleteLimit}) if err != nil { log.ZError(ctx, "cron deleteoutDatedData failed", err) return } count += int(resp.Count) - if resp.Count < int32(pageShowNumber) { + if resp.Count < deleteLimit { break } } log.ZDebug(ctx, "cron deleteoutDatedData success", "deltime", deleteTime, "cont", time.Since(start), "count", count) } + +// var req *third.DeleteOutdatedDataReq +// count1, err := ExtractField(ctx, c.thirdClient.DeleteOutdatedData, req, (*third.DeleteOutdatedDataResp).GetCount) +// +// c.thirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{}) +// msggateway.GetUsersOnlineStatusCaller.Invoke(ctx, &msggateway.GetUsersOnlineStatusReq{}) +// +// var cli ThirdClient +// +// c111, err := cli.DeleteOutdatedData(ctx, 100) +// +// cli.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{}) +// +// cli.AuthSign(ctx, &third.AuthSignReq{}) +// +// cli.SetAppBadge() +// +//} +// +//func extractField[A, B, C any](ctx context.Context, fn func(ctx context.Context, req *A, opts ...grpc.CallOption) (*B, error), req *A, get func(*B) C) (C, error) { +// resp, err := fn(ctx, req) +// if err != nil { +// var c C +// return c, err +// } +// return get(resp), nil +//} +// +//func ignore(_ any, err error) error { +// return err +//} +// +//type ThirdClient struct { +// third.ThirdClient +//} +// +//func (c *ThirdClient) DeleteOutdatedData(ctx context.Context, expireTime int64) (int32, error) { +// return extractField(ctx, c.ThirdClient.DeleteOutdatedData, &third.DeleteOutdatedDataReq{ExpireTime: expireTime}, (*third.DeleteOutdatedDataResp).GetCount) +//} +// +//func (c *ThirdClient) DeleteOutdatedData1(ctx context.Context, expireTime int64) error { +// return ignore(c.ThirdClient.DeleteOutdatedData(ctx, &third.DeleteOutdatedDataReq{ExpireTime: expireTime})) +//} diff --git a/internal/tools/user_msg.go b/internal/tools/user_msg.go index 52a2b6bdce..3d4ca40d3f 100644 --- a/internal/tools/user_msg.go +++ b/internal/tools/user_msg.go @@ -3,7 +3,6 @@ package tools import ( "fmt" pbconversation "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/msg" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "os" @@ -14,18 +13,22 @@ func (c *cronServer) clearUserMsg() { now := time.Now() operationID := fmt.Sprintf("cron_user_msg_%d_%d", os.Getpid(), now.UnixMilli()) ctx := mcontext.SetOperationID(c.ctx, operationID) - log.ZDebug(ctx, "clear msg cron start") - conversations, err := c.conversationClient.GetConversationsNeedClearMsg(ctx, &pbconversation.GetConversationsNeedClearMsgReq{}) - if err != nil { - log.ZError(ctx, "Get conversation need Destruct msgs failed.", err) - return + log.ZDebug(ctx, "clear user msg cron start") + const ( + deleteCount = 200 + deleteLimit = 100 + ) + var count int + for i := 1; i <= deleteCount; i++ { + resp, err := c.conversationClient.ClearUserConversationMsg(ctx, &pbconversation.ClearUserConversationMsgReq{Timestamp: now.UnixMilli(), Limit: deleteLimit}) + if err != nil { + log.ZError(ctx, "ClearUserConversationMsg failed.", err) + return + } + count += int(resp.Count) + if resp.Count < deleteLimit { + break + } } - - _, err = c.msgClient.ClearMsg(ctx, &msg.ClearMsgReq{Conversations: conversations.Conversations}) - if err != nil { - log.ZError(ctx, "Clear Msg failed.", err) - return - } - - log.ZDebug(ctx, "clear msg cron task completed", "cont", time.Since(now)) + log.ZDebug(ctx, "clear user msg cron task completed", "cont", time.Since(now), "count", count) } diff --git a/pkg/common/storage/controller/conversation.go b/pkg/common/storage/controller/conversation.go index bf41cce957..d4088e0c0e 100644 --- a/pkg/common/storage/controller/conversation.go +++ b/pkg/common/storage/controller/conversation.go @@ -74,6 +74,8 @@ type ConversationDatabase interface { GetNotNotifyConversationIDs(ctx context.Context, userID string) ([]string, error) // GetPinnedConversationIDs gets pinned conversationIDs by userID GetPinnedConversationIDs(ctx context.Context, userID string) ([]string, error) + // FindRandConversation finds random conversations based on the specified timestamp and limit. + FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) } func NewConversationDatabase(conversation database.Conversation, cache cache.ConversationCache, tx tx.Tx) ConversationDatabase { @@ -401,3 +403,7 @@ func (c *conversationDatabase) GetPinnedConversationIDs(ctx context.Context, use } return conversationIDs, nil } + +func (c *conversationDatabase) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*relationtb.Conversation, error) { + return c.conversationDB.FindRandConversation(ctx, ts, limit) +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 88089f1a09..c29544c333 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -103,6 +103,8 @@ type CommonMsgDatabase interface { SetUserConversationsMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error DeleteDoc(ctx context.Context, docID string) error + + GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -1016,3 +1018,7 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { return db.msgDocDatabase.DeleteDoc(ctx, docID) } + +func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { + return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) +} diff --git a/pkg/common/storage/database/conversation.go b/pkg/common/storage/database/conversation.go index 30ca01ee71..1fb53cfed2 100644 --- a/pkg/common/storage/database/conversation.go +++ b/pkg/common/storage/database/conversation.go @@ -42,4 +42,5 @@ type Conversation interface { GetConversationIDsNeedDestruct(ctx context.Context) ([]*model.Conversation, error) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) + FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) } diff --git a/pkg/common/storage/database/mgo/conversation.go b/pkg/common/storage/database/mgo/conversation.go index 10e223c893..851ec99c40 100644 --- a/pkg/common/storage/database/mgo/conversation.go +++ b/pkg/common/storage/database/mgo/conversation.go @@ -228,3 +228,35 @@ func (c *ConversationMgo) GetConversationNotReceiveMessageUserIDs(ctx context.Co func (c *ConversationMgo) FindConversationUserVersion(ctx context.Context, userID string, version uint, limit int) (*model.VersionLog, error) { return c.version.FindChangeLog(ctx, userID, version, limit) } + +func (c *ConversationMgo) FindRandConversation(ctx context.Context, ts int64, limit int) ([]*model.Conversation, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "is_msg_destruct": true, + "msg_destruct_time": bson.M{"$ne": 0}, + }, + }, + { + "$addFields": bson.M{ + "next_msg_destruct_timestamp": bson.M{ + "$add": []any{ + bson.M{ + "$toLong": "$latest_msg_destruct_time", + }, "$msg_destruct_time"}, + }, + }, + }, + { + "$match": bson.M{ + "next_msg_destruct_timestamp": bson.M{"$lt": ts}, + }, + }, + { + "$sample": bson.M{ + "size": limit, + }, + }, + } + return mongoutil.Aggregate[*model.Conversation](ctx, c.coll, pipeline) +} diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 937dbae1dd..e014654663 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -1290,7 +1290,9 @@ func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]* }, }, { - "$sample": limit, + "$sample": bson.M{ + "size": limit, + }, }, }) } @@ -1307,53 +1309,56 @@ func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true) } -//func (m *MsgMgo) ClearMsg(ctx context.Context, t time.Time) (int64, error) { -// ts := t.UnixMilli() -// var count int64 -// for { -// msgs, err := m.GetBeforeMsg(ctx, ts, 100) -// if err != nil { -// return count, err -// } -// if len(msgs) == 0 { -// return count, nil -// } -// for _, msg := range msgs { -// num, err := m.deleteOneMsg(ctx, ts, msg) -// count += num -// if err != nil { -// return count, err -// } -// } -// } -//} - func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error { return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID}) } -//func (m *MsgMgo) DeleteDocMsg(ctx context.Context, ts int64, doc *relation.MsgDocModel) (int64, error) { -// var notNull int -// index := make([]int, 0, len(doc.Msg)) -// for i, message := range doc.Msg { -// if message.Msg != nil { -// notNull++ -// if message.Msg.SendTime < ts { -// index = append(index, i) -// } -// } -// } -// if len(index) == 0 { -// return 0, errs.New("no msg to delete").WrapMsg("deleteOneMsg", "docID", doc.DocID) -// } -// if len(index) == notNull { -// if err := m.DeleteDoc(ctx, doc.DocID); err != nil { -// return 0, err -// } -// } else { -// if err := m.setNullMsg(ctx, doc.DocID, index); err != nil { -// return 0, err -// } -// } -// return int64(len(index)), nil -//} +func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "doc_id": bson.M{ + "$regex": fmt.Sprintf("^%s:", conversationID), + }, + }, + }, + { + "$match": bson.M{ + "msgs.msg.send_time": bson.M{ + "msgs.msg.send_time": bson.M{ + "$lte": time, + }, + }, + }, + }, + { + "$sort": bson.M{ + "_id": -1, + }, + }, + { + "$limit": 1, + }, + { + "$project": bson.M{ + "_id": 0, + "msgs.msg.send_time": 1, + "msgs.msg.seq": 1, + }, + }, + } + res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline) + if err != nil { + return 0, err + } + if len(res) == 0 { + return 0, nil + } + var seq int64 + for _, v := range res[0].Msg { + if v.Msg.SendTime <= time { + seq = v.Msg.Seq + } + } + return seq, nil +} diff --git a/pkg/common/storage/database/mgo/msg_test.go b/pkg/common/storage/database/mgo/msg_test.go index eff847d262..7218ef1ad1 100644 --- a/pkg/common/storage/database/mgo/msg_test.go +++ b/pkg/common/storage/database/mgo/msg_test.go @@ -15,10 +15,10 @@ import ( ) func TestName1(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) - defer cancel() - cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - + //ctx, cancel := context.WithTimeout(context.Background(), time.Second*300) + //defer cancel() + //cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) + // //v := &MsgMgo{ // coll: cli.Database("openim_v3").Collection("msg3"), //} @@ -44,16 +44,16 @@ func TestName1(t *testing.T) { //} // //t.Log(total) - - msg, err := NewMsgMongo(cli.Database("openim_v3")) - if err != nil { - panic(err) - } - res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) - if err != nil { - panic(err) - } - t.Log(len(res)) + // + //msg, err := NewMsgMongo(cli.Database("openim_v3")) + //if err != nil { + // panic(err) + //} + //res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) + //if err != nil { + // panic(err) + //} + //t.Log(len(res)) } func TestName10(t *testing.T) { @@ -95,13 +95,20 @@ func TestName4(t *testing.T) { defer cancel() cli := Result(mongo.Connect(ctx, options.Client().ApplyURI("mongodb://openIM:openIM123@172.16.8.66:37017/openim_v3?maxPoolSize=100").SetConnectTimeout(5*time.Second))) - msg, err := NewMsgMongo(cli.Database("openim_v3")) + msg, err := NewConversationMongo(cli.Database("openim_v3")) if err != nil { panic(err) } - res, err := msg.GetBeforeMsg(ctx, time.Now().UnixMilli(), []string{"1:0"}, 1000) + ts := time.Now().UnixMilli() + t.Log(ts) + res, err := msg.FindRandConversation(ctx, ts, 10) if err != nil { panic(err) } - t.Log(len(res)) + t.Log(res) +} + +func TestName5(t *testing.T) { + var v time.Time + t.Log(v.UnixMilli()) } diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index 17aaf23429..abb2a44c2f 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -48,4 +48,6 @@ type Msg interface { GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) + + GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) }