diff --git a/go.mod b/go.mod index efdac43f83..03a7a4d4dd 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 - github.com/openimsdk/protocol v0.0.72-alpha.69 + github.com/openimsdk/protocol v0.0.72-alpha.70 github.com/openimsdk/tools v0.0.50-alpha.63 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index f87e038d00..4c297134d5 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrkXcDACCqtyM= github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.69 h1:b22oY2XTdBR/BePqA73KsrM3GDF3Vk8YcBEXZU4ArJc= -github.com/openimsdk/protocol v0.0.72-alpha.69/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= +github.com/openimsdk/protocol v0.0.72-alpha.70 h1:j7vB81+rTthijRda2b8tlli9oWvPxr4yXHwZ8nPZIBQ= +github.com/openimsdk/protocol v0.0.72-alpha.70/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= github.com/openimsdk/tools v0.0.50-alpha.63 h1:dPoVvg4KWqYX/xtK3j96TwX2A/4jwT5S5XIHvSM9hTY= github.com/openimsdk/tools v0.0.50-alpha.63/go.mod h1:B+oqV0zdewN7OiEHYJm+hW+8/Te7B8tHHgD8rK5ZLZk= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 1040f2be23..2890764355 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -242,6 +242,8 @@ func (c *Client) handleMessage(message []byte) error { resp, messageErr = c.setAppBackgroundStatus(ctx, binaryReq) case WsSubUserOnlineStatus: resp, messageErr = c.longConnServer.SubUserOnlineStatus(ctx, c, binaryReq) + case WsPullConvLastMessage: + resp, messageErr = c.longConnServer.GetLastMessage(ctx, binaryReq) default: return fmt.Errorf( "ReqIdentifier failed,sendID:%s,msgIncr:%s,reqIdentifier:%d", diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index a825c05196..b77cc44c30 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -52,6 +52,7 @@ const ( WsLogoutMsg = 2003 WsSetBackgroundStatus = 2004 WsSubUserOnlineStatus = 2005 + WsPullConvLastMessage = 2006 WSDataError = 3001 ) diff --git a/internal/msggateway/message_handler.go b/internal/msggateway/message_handler.go index 9b59867d61..ca15e1ef63 100644 --- a/internal/msggateway/message_handler.go +++ b/internal/msggateway/message_handler.go @@ -108,6 +108,7 @@ type MessageHandler interface { GetSeqMessage(ctx context.Context, data *Req) ([]byte, error) UserLogout(ctx context.Context, data *Req) ([]byte, error) SetUserDeviceBackground(ctx context.Context, data *Req) ([]byte, bool, error) + GetLastMessage(ctx context.Context, data *Req) ([]byte, error) } var _ MessageHandler = (*GrpcHandler)(nil) @@ -266,3 +267,15 @@ func (g *GrpcHandler) SetUserDeviceBackground(ctx context.Context, data *Req) ([ } return nil, req.IsBackground, nil } + +func (g *GrpcHandler) GetLastMessage(ctx context.Context, data *Req) ([]byte, error) { + var req msg.GetLastMessageReq + if err := proto.Unmarshal(data.Data, &req); err != nil { + return nil, err + } + resp, err := g.msgClient.GetLastMessage(ctx, &req) + if err != nil { + return nil, err + } + return proto.Marshal(resp) +} diff --git a/internal/rpc/msg/sync_msg.go b/internal/rpc/msg/sync_msg.go index 7d4ffa3e62..6cf1c21d34 100644 --- a/internal/rpc/msg/sync_msg.go +++ b/internal/rpc/msg/sync_msg.go @@ -245,3 +245,11 @@ func (m *msgServer) SearchMessage(ctx context.Context, req *msg.SearchMessageReq func (m *msgServer) GetServerTime(ctx context.Context, _ *msg.GetServerTimeReq) (*msg.GetServerTimeResp, error) { return &msg.GetServerTimeResp{ServerTime: timeutil.GetCurrentTimestampByMill()}, nil } + +func (m *msgServer) GetLastMessage(ctx context.Context, req *msg.GetLastMessageReq) (*msg.GetLastMessageResp, error) { + msgs, err := m.MsgDatabase.GetLastMessage(ctx, req.ConversationIDs, req.UserID) + if err != nil { + return nil, err + } + return &msg.GetLastMessageResp{Msgs: msgs}, nil +} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index d5ad12584c..a93d581eb8 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -97,6 +97,8 @@ type CommonMsgDatabase interface { DeleteDoc(ctx context.Context, docID string) error GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) + + GetLastMessage(ctx context.Context, conversationIDS []string, userID string) (map[string]*sdkws.MsgData, error) } func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser cache.SeqUser, seqConversation cache.SeqConversationCache, kafkaConf *config.Kafka) (CommonMsgDatabase, error) { @@ -811,8 +813,29 @@ func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationI if v, ok := seqMsgs[seq]; ok { res = append(res, convert.MsgDB2Pb(v.Msg)) } else { - res = append(res, &sdkws.MsgData{Seq: seq}) + res = append(res, &sdkws.MsgData{Seq: seq, Status: constant.MsgStatusHasDeleted}) + } + } + return res, nil +} + +func (db *commonMsgDatabase) GetLastMessage(ctx context.Context, conversationIDs []string, userID string) (map[string]*sdkws.MsgData, error) { + res := make(map[string]*sdkws.MsgData) + for _, conversationID := range conversationIDs { + if _, ok := res[conversationID]; ok { + continue + } + msg, err := db.msgDocDatabase.GetLastMessage(ctx, conversationID) + if err != nil { + if errs.Unwrap(err) == mongo.ErrNoDocuments { + continue + } + return nil, err } + tmp := []*model.MsgInfoModel{msg} + db.handlerDeleteAndRevoked(ctx, userID, tmp) + db.handlerQuote(ctx, userID, conversationID, tmp) + res[conversationID] = convert.MsgDB2Pb(msg.Msg) } return res, nil } diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index 03ebff6110..c440d44420 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -997,6 +997,68 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str return seq, nil } +func (m *MsgMgo) GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) { + pipeline := []bson.M{ + { + "$match": bson.M{ + "doc_id": bson.M{ + "$regex": fmt.Sprintf("^%s", conversationID), + }, + }, + }, + { + "$match": bson.M{ + "msgs.msg.status": bson.M{ + "$lt": constant.MsgStatusHasDeleted, + }, + }, + }, + { + "$sort": bson.M{ + "_id": -1, + }, + }, + { + "$limit": 1, + }, + { + "$project": bson.M{ + "_id": 0, + "doc_id": 0, + }, + }, + { + "$unwind": "$msgs", + }, + { + "$match": bson.M{ + "msgs.msg.status": bson.M{ + "$lt": constant.MsgStatusHasDeleted, + }, + }, + }, + { + "$sort": bson.M{ + "msgs.msg.seq": -1, + }, + }, + { + "$limit": 1, + }, + } + type Result struct { + Msgs *model.MsgInfoModel `bson:"msgs"` + } + res, err := mongoutil.Aggregate[*Result](ctx, m.coll, pipeline) + if err != nil { + return nil, err + } + if len(res) == 0 { + return nil, errs.Wrap(mongo.ErrNoDocuments) + } + return res[0].Msgs, nil +} + func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) { if len(indexes) == 0 { return nil, nil diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index b44e702964..e3c4e8ece4 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -39,5 +39,6 @@ type Msg interface { DeleteDoc(ctx context.Context, docID string) error GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) + GetLastMessage(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) }