From 24a6641145f3dbb61995195c38fd7cb75d691cbd Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Thu, 31 Oct 2024 11:34:08 +0800 Subject: [PATCH] feat: stream msg --- go.mod | 2 + go.sum | 2 - internal/api/msg.go | 2 + internal/rpc/msg/notification.go | 4 + internal/rpc/msg/server.go | 5 +- internal/rpc/msg/stream_msg.go | 109 ++++++++++++++++++++ pkg/apistruct/msg.go | 5 + pkg/common/storage/controller/stream_msg.go | 12 +++ pkg/common/storage/model/stream_msg.go | 21 ++++ 9 files changed, 158 insertions(+), 4 deletions(-) create mode 100644 internal/rpc/msg/stream_msg.go create mode 100644 pkg/common/storage/controller/stream_msg.go create mode 100644 pkg/common/storage/model/stream_msg.go diff --git a/go.mod b/go.mod index 57d01c38f2..e3edae0ec8 100644 --- a/go.mod +++ b/go.mod @@ -197,3 +197,5 @@ require ( golang.org/x/crypto v0.27.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) + +replace github.com/openimsdk/protocol => /Users/chao/Desktop/withchao/protocol \ No newline at end of file diff --git a/go.sum b/go.sum index 94553ceddb..48a42a2517 100644 --- a/go.sum +++ b/go.sum @@ -319,8 +319,6 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y= github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y= github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= -github.com/openimsdk/protocol v0.0.72-alpha.48 h1:DVeT8Kej6OjB9bsxQ0q6FU160anwfPuVmAL/1J6VzqM= -github.com/openimsdk/protocol v0.0.72-alpha.48/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8= github.com/openimsdk/tools v0.0.50-alpha.16 h1:bC1AQvJMuOHtZm8LZRvN8L5mH1Ws2VYdL+TLTs1iGSc= github.com/openimsdk/tools v0.0.50-alpha.16/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= diff --git a/internal/api/msg.go b/internal/api/msg.go index bf7cb83a43..e99bd4255e 100644 --- a/internal/api/msg.go +++ b/internal/api/msg.go @@ -173,6 +173,8 @@ func (m *MessageApi) getSendMsgReq(c *gin.Context, req apistruct.SendMsg) (sendM data = apistruct.AtElem{} case constant.Custom: data = apistruct.CustomElem{} + case constant.Stream: + data = apistruct.StreamMsgElem{} case constant.OANotification: data = apistruct.OANotificationElem{} req.SessionType = constant.NotificationChatType diff --git a/internal/rpc/msg/notification.go b/internal/rpc/msg/notification.go index 3b13676bf6..26e3c7f46a 100644 --- a/internal/rpc/msg/notification.go +++ b/internal/rpc/msg/notification.go @@ -48,3 +48,7 @@ func (m *MsgNotificationSender) MarkAsReadNotification(ctx context.Context, conv } m.NotificationWithSessionType(ctx, sendID, recvID, constant.HasReadReceipt, sessionType, tips) } + +func (m *MsgNotificationSender) StreamMsgNotification(ctx context.Context, sendID string, recvID string, sessionType int32, tips *sdkws.StreamMsgTips) { + m.NotificationWithSessionType(ctx, sendID, recvID, constant.StreamMsgNotification, sessionType, tips) +} diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 91f41f1b11..a8628383a9 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -42,8 +42,9 @@ type ( // MsgServer encapsulates dependencies required for message handling. msgServer struct { - RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. - MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + RegisterCenter discovery.SvcDiscoveryRegistry // Service discovery registry for service registration. + MsgDatabase controller.CommonMsgDatabase // Interface for message database operations. + StreamMsgDatabase controller.StreamMsgDatabase Conversation *rpcclient.ConversationRpcClient // RPC client for conversation service. UserLocalCache *rpccache.UserLocalCache // Local cache for user data. FriendLocalCache *rpccache.FriendLocalCache // Local cache for friend data. diff --git a/internal/rpc/msg/stream_msg.go b/internal/rpc/msg/stream_msg.go new file mode 100644 index 0000000000..f959ebc797 --- /dev/null +++ b/internal/rpc/msg/stream_msg.go @@ -0,0 +1,109 @@ +package msg + +import ( + "context" + "fmt" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" + "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" + "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/tools/errs" + "time" +) + +const StreamDeadlineTime = time.Second * 60 * 10 + +func (m *msgServer) handlerStreamMsg(ctx context.Context, msgData *sdkws.MsgData) error { + now := time.Now() + val := &model.StreamMsg{ + ClientMsgID: msgData.ClientMsgID, + ConversationID: msgprocessor.GetConversationIDByMsg(msgData), + UserID: msgData.SendID, + CreateTime: now, + DeadlineTime: now.Add(StreamDeadlineTime), + } + return m.StreamMsgDatabase.CreateStreamMsg(ctx, val) +} + +func (m *msgServer) getStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) { + res, err := m.StreamMsgDatabase.GetStreamMsg(ctx, clientMsgID) + if err != nil { + return nil, err + } + if !res.End && res.DeadlineTime.Before(time.Now()) { + res.End = true + } + return res, nil +} + +func (m *msgServer) AppendStreamMsg(ctx context.Context, req *msg.AppendStreamMsgReq) (*msg.AppendStreamMsgResp, error) { + res, err := m.getStreamMsg(ctx, req.ClientMsgID) + if err != nil { + return nil, err + } + if res.End { + return nil, errs.ErrNoPermission.WrapMsg("stream msg is end") + } + if len(res.Packets) < int(req.StartIndex) { + return nil, errs.ErrNoPermission.WrapMsg("start index is invalid") + } + if val := len(res.Packets) - int(req.StartIndex); val > 0 { + exist := res.Packets[int(req.StartIndex):] + for i, s := range exist { + if len(req.Packets) == 0 { + break + } + if s != req.Packets[i] { + return nil, errs.ErrNoPermission.WrapMsg(fmt.Sprintf("packet %d has been written and is inconsistent", i)) + } + req.StartIndex++ + req.Packets = req.Packets[1:] + } + } + if len(req.Packets) == 0 && res.End == req.End { + return &msg.AppendStreamMsgResp{}, nil + } + if err := m.StreamMsgDatabase.AppendStreamMsg(ctx, req.ClientMsgID, int(req.StartIndex), req.Packets, req.End); err != nil { + return nil, err + } + conversation, err := m.Conversation.GetConversation(ctx, res.UserID, res.ConversationID) + if err != nil { + return nil, err + } + tips := &sdkws.StreamMsgTips{ + ClientMsgID: res.ClientMsgID, + StartIndex: req.StartIndex, + Packets: req.Packets, + End: req.End, + } + var ( + recvID string + sessionType int32 + ) + if conversation.GroupID == "" { + sessionType = constant.SingleChatType + recvID = conversation.UserID + } else { + sessionType = constant.ReadGroupChatType + recvID = conversation.GroupID + } + m.msgNotificationSender.StreamMsgNotification(ctx, res.UserID, recvID, sessionType, tips) + return &msg.AppendStreamMsgResp{}, nil +} + +func (m *msgServer) GetStreamMsg(ctx context.Context, req *msg.GetStreamMsgReq) (*msg.GetStreamMsgResp, error) { + res, err := m.getStreamMsg(ctx, req.ClientMsgID) + if err != nil { + return nil, err + } + return &msg.GetStreamMsgResp{ + ClientMsgID: res.ClientMsgID, + ConversationID: res.ConversationID, + UserID: res.UserID, + Packets: res.Packets, + End: res.End, + CreateTime: res.CreateTime.UnixMilli(), + DeadlineTime: res.DeadlineTime.UnixMilli(), + }, nil +} diff --git a/pkg/apistruct/msg.go b/pkg/apistruct/msg.go index dc20b5104b..dda3ff3178 100644 --- a/pkg/apistruct/msg.go +++ b/pkg/apistruct/msg.go @@ -81,6 +81,11 @@ type TextElem struct { Content string `json:"content" validate:"required"` } +type StreamMsgElem struct { + Type string `mapstructure:"type" validate:"required"` + Content string `mapstructure:"content" validate:"required"` +} + type RevokeElem struct { RevokeMsgClientID string `mapstructure:"revokeMsgClientID" validate:"required"` } diff --git a/pkg/common/storage/controller/stream_msg.go b/pkg/common/storage/controller/stream_msg.go new file mode 100644 index 0000000000..ca4402fb9c --- /dev/null +++ b/pkg/common/storage/controller/stream_msg.go @@ -0,0 +1,12 @@ +package controller + +import ( + "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" +) + +type StreamMsgDatabase interface { + CreateStreamMsg(ctx context.Context, model *model.StreamMsg) error + AppendStreamMsg(ctx context.Context, clientMsgID string, startIndex int, packets []string, end bool) error + GetStreamMsg(ctx context.Context, clientMsgID string) (*model.StreamMsg, error) +} diff --git a/pkg/common/storage/model/stream_msg.go b/pkg/common/storage/model/stream_msg.go new file mode 100644 index 0000000000..c040426a45 --- /dev/null +++ b/pkg/common/storage/model/stream_msg.go @@ -0,0 +1,21 @@ +package model + +import ( + "time" +) + +const ( + StreamMsgStatusWait = 0 + StreamMsgStatusDone = 1 + StreamMsgStatusFail = 2 +) + +type StreamMsg struct { + ClientMsgID string `bson:"client_msg_id"` + ConversationID string `bson:"conversation_id"` + UserID string `bson:"user_id"` + Packets []string `bson:"packets"` + End bool `bson:"end"` + CreateTime time.Time `bson:"create_time"` + DeadlineTime time.Time `bson:"deadline_time"` +}