diff --git a/cmd/openim-msgtransfer/main.go b/cmd/openim-msgtransfer/main.go index 23b629d69a..a3a14def8c 100644 --- a/cmd/openim-msgtransfer/main.go +++ b/cmd/openim-msgtransfer/main.go @@ -17,9 +17,13 @@ package main import ( "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/tools/system/program" + "os" ) func main() { + if len(os.Args) == 1 { + os.Args = append(os.Args, "-i", "0", "-c", "/Users/chao/Desktop/withchao/open-im-server/config/") + } if err := cmd.NewMsgTransferCmd().Exec(); err != nil { program.ExitWithError(err) } diff --git a/internal/msgtransfer/online_history_msg_handler.go b/internal/msgtransfer/online_history_msg_handler.go index 77161202cb..8c1978d48b 100644 --- a/internal/msgtransfer/online_history_msg_handler.go +++ b/internal/msgtransfer/online_history_msg_handler.go @@ -16,6 +16,7 @@ package msgtransfer import ( "context" + "encoding/json" "errors" "github.com/IBM/sarama" "github.com/go-redis/redis" @@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont och.conversationRpcClient = conversationRpcClient och.groupRpcClient = groupRpcClient och.historyConsumerGroup = historyConsumerGroup + return &och, err } func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) { @@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID ctx = withAggregationCtx(ctx, ctxMessages) log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages), "key", val.Key()) + och.doSetReadSeq(ctx, ctxMessages) storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList := och.categorizeMessageLists(ctxMessages) @@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList) } +func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) { + type seqKey struct { + conversationID string + userID string + } + var readSeq map[seqKey]int64 + for _, msg := range msgs { + if msg.message.ContentType != constant.HasReadReceipt { + continue + } + var elem sdkws.NotificationElem + if err := json.Unmarshal(msg.message.Content, &elem); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) + continue + } + var tips sdkws.MarkAsReadTips + if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { + log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) + continue + } + if len(tips.Seqs) > 0 { + for _, seq := range tips.Seqs { + if tips.HasReadSeq < seq { + tips.HasReadSeq = seq + } + } + clear(tips.Seqs) + tips.Seqs = nil + } + if tips.HasReadSeq < 0 { + continue + } + if readSeq == nil { + readSeq = make(map[seqKey]int64) + } + key := seqKey{ + conversationID: tips.ConversationID, + userID: tips.MarkAsReadUserID, + } + if readSeq[key] > tips.HasReadSeq { + continue + } + readSeq[key] = tips.HasReadSeq + } + if readSeq == nil { + return + } + for key, seq := range readSeq { + if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil { + log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq) + } + } +} + func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg { var ctxMessages []*ContextMsg for i := 0; i < len(consumerMessages); i++ { diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index a72fb4792b..cea47fcd50 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -16,7 +16,6 @@ package msgtransfer import ( "context" - "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" diff --git a/internal/push/push_handler.go b/internal/push/push_handler.go index 8979214dbe..8ecb3dad1a 100644 --- a/internal/push/push_handler.go +++ b/internal/push/push_handler.go @@ -17,7 +17,6 @@ package push import ( "context" "encoding/json" - "fmt" "github.com/IBM/sarama" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush" "github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options" @@ -36,17 +35,11 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mcontext" "github.com/openimsdk/tools/mq/kafka" - "github.com/openimsdk/tools/mq/memamq" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/timeutil" "github.com/redis/go-redis/v9" "google.golang.org/protobuf/proto" - "math/rand" - "os" - "strconv" - "sync/atomic" - "time" ) type ConsumerHandler struct { @@ -61,7 +54,6 @@ type ConsumerHandler struct { groupRpcClient rpcclient.GroupRpcClient webhookClient *webhook.Client config *Config - readCh chan *sdkws.MarkAsReadTips } func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient, @@ -84,8 +76,6 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL) consumerHandler.config = config consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil) - consumerHandler.readCh = make(chan *sdkws.MarkAsReadTips, 1024*8) - go consumerHandler.loopRead() return &consumerHandler, nil } @@ -99,7 +89,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) { MsgData: msgFromMQ.MsgData, ConversationID: msgFromMQ.ConversationID, } - c.handlerConversationRead(ctx, pbData.MsgData) sec := msgFromMQ.MsgData.SendTime / 1000 nowSec := timeutil.GetCurrentTimestampBySecond() log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec) @@ -129,98 +118,6 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil } func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil } -func (c *ConsumerHandler) loopRead() { - type markKey struct { - ConversationID string - UserID string - } - type markSeq struct { - ReadSeq int64 - MarkSeq int64 - Count int64 - } - type asyncRequest struct { - ConversationID string - UserID string - ReadSeq int64 - } - ctx := context.Background() - ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0]) - opIDPrefix := fmt.Sprintf("mark_read_%d_%d_", os.Getpid(), rand.Uint32()) - var incr atomic.Uint64 - maxSeq := make(map[markKey]*markSeq, 1024*8) - queue := memamq.NewMemoryQueue(32, 1024) - defer queue.Stop() - ticker := time.NewTicker(time.Second * 10) - defer ticker.Stop() - for { - select { - case <-ticker.C: - var markSeqs []asyncRequest - for key, seq := range maxSeq { - if seq.MarkSeq >= seq.ReadSeq { - seq.Count++ - if seq.Count > 6 { - delete(maxSeq, key) - } - continue - } - seq.Count = 0 - seq.MarkSeq = seq.ReadSeq - markSeqs = append(markSeqs, asyncRequest{ - ConversationID: key.ConversationID, - UserID: key.UserID, - ReadSeq: seq.ReadSeq, - }) - } - if len(markSeqs) == 0 { - continue - } - go func() { - for i := range markSeqs { - seq := markSeqs[i] - _ = queue.PushCtx(ctx, func() { - ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10)) - _, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{ - ConversationID: seq.ConversationID, - UserID: seq.UserID, - HasReadSeq: seq.ReadSeq, - NoNotification: true, - }) - if err != nil { - log.ZError(ctx, "ConsumerHandler SetConversationHasReadSeq", err, "conversationID", seq.ConversationID, "userID", seq.UserID, "readSeq", seq.ReadSeq) - } - }) - } - }() - - case tips, ok := <-c.readCh: - if !ok { - return - } - if tips.HasReadSeq <= 0 { - continue - } - key := markKey{ - ConversationID: tips.ConversationID, - UserID: tips.MarkAsReadUserID, - } - ms, ok := maxSeq[key] - if ok { - if ms.ReadSeq < tips.HasReadSeq { - ms.ReadSeq = tips.HasReadSeq - } - } else { - ms = &markSeq{ - ReadSeq: tips.HasReadSeq, - MarkSeq: 0, - } - maxSeq[key] = ms - } - } - } -} - func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { ctx := c.pushConsumerGroup.GetContextFromMsg(msg) @@ -318,39 +215,6 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws. return result, nil } -func (c *ConsumerHandler) handlerConversationRead(ctx context.Context, msg *sdkws.MsgData) { - if msg.ContentType != constant.HasReadReceipt { - return - } - var elem sdkws.NotificationElem - if err := json.Unmarshal(msg.Content, &elem); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg) - return - } - var tips sdkws.MarkAsReadTips - if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil { - log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg) - return - } - if len(tips.Seqs) > 0 { - for _, seq := range tips.Seqs { - if tips.HasReadSeq < seq { - tips.HasReadSeq = seq - } - } - clear(tips.Seqs) - tips.Seqs = nil - } - if tips.HasReadSeq < 0 { - return - } - select { - case c.readCh <- &tips: - default: - log.ZWarn(ctx, "handlerConversationRead readCh is full", nil, "markAsReadTips", &tips) - } -} - func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) { log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID) var pushToUserIDs []string diff --git a/internal/rpc/msg/as_read.go b/internal/rpc/msg/as_read.go index 9a3174e8ef..bfba4824fe 100644 --- a/internal/rpc/msg/as_read.go +++ b/internal/rpc/msg/as_read.go @@ -80,16 +80,10 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC if req.HasReadSeq > maxSeq { return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq") } - if req.NoNotification { - if err := m.MsgDatabase.SetHasReadSeqToDB(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { - return nil, err - } - } else { - if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { - return nil, err - } - m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq) + if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil { + return nil, err } + m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq) return &msg.SetConversationHasReadSeqResp{}, nil } diff --git a/pkg/common/storage/cache/redis/seq_user.go b/pkg/common/storage/cache/redis/seq_user.go index 5458b6594b..0cedfeee12 100644 --- a/pkg/common/storage/cache/redis/seq_user.go +++ b/pkg/common/storage/cache/redis/seq_user.go @@ -73,18 +73,23 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s }) } -func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error { - if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { - return errs.Wrap(err) +func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID) + if err != nil { + return err } - if writeDB { - if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { - return err + if dbSeq < seq { + if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil { + return errs.Wrap(err) } } return nil } +func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error { + return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq) +} + func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error { keys := make([]string, 0, len(seqs)) for conversationID, seq := range seqs { @@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string, if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil { return err } - for conversationID, seq := range seqs { - if seq%s.readSeqWriteRatio == 0 { - if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { - return err - } - } - } return nil } diff --git a/pkg/common/storage/cache/seq_user.go b/pkg/common/storage/cache/seq_user.go index 5bb9cad043..cef414e16e 100644 --- a/pkg/common/storage/cache/seq_user.go +++ b/pkg/common/storage/cache/seq_user.go @@ -8,7 +8,8 @@ type SeqUser interface { GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error) SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error) - SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error + SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error + SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index 6a75afcb12..7f884165de 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -338,7 +338,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { for userID, seq := range userSeqMap { - if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq, false); err != nil { + if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil { return err } } @@ -806,11 +806,11 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri } func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, false) + return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error { - return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, true) + return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq) } func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) { diff --git a/pkg/common/storage/database/mgo/seq_user.go b/pkg/common/storage/database/mgo/seq_user.go index 9faad416ae..244de30000 100644 --- a/pkg/common/storage/database/mgo/seq_user.go +++ b/pkg/common/storage/database/mgo/seq_user.go @@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve } func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error { + dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID) + if err != nil { + return err + } + if dbSeq > seq { + return nil + } return s.setSeq(ctx, conversationID, userID, seq, "read_seq") }