Skip to content

Commit

Permalink
read seq is written to mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
withchao committed Aug 26, 2024
1 parent 5f8713a commit 764b47e
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 163 deletions.
4 changes: 4 additions & 0 deletions cmd/openim-msgtransfer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ package main
import (
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/tools/system/program"
"os"
)

func main() {
if len(os.Args) == 1 {
os.Args = append(os.Args, "-i", "0", "-c", "/Users/chao/Desktop/withchao/open-im-server/config/")
}
if err := cmd.NewMsgTransferCmd().Exec(); err != nil {
program.ExitWithError(err)
}
Expand Down
57 changes: 57 additions & 0 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msgtransfer

import (
"context"
"encoding/json"
"errors"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup

return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
Expand All @@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)

storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
Expand All @@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}

func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
type seqKey struct {
conversationID string
userID string
}
var readSeq map[seqKey]int64
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
continue
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue
}
if readSeq == nil {
readSeq = make(map[seqKey]int64)
}
key := seqKey{
conversationID: tips.ConversationID,
userID: tips.MarkAsReadUserID,
}
if readSeq[key] > tips.HasReadSeq {
continue
}
readSeq[key] = tips.HasReadSeq
}
if readSeq == nil {
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
}

func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {
Expand Down
1 change: 0 additions & 1 deletion internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package msgtransfer

import (
"context"

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
Expand Down
136 changes: 0 additions & 136 deletions internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package push
import (
"context"
"encoding/json"
"fmt"
"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush"
"github.com/openimsdk/open-im-server/v3/internal/push/offlinepush/options"
Expand All @@ -36,17 +35,11 @@ import (
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mq/kafka"
"github.com/openimsdk/tools/mq/memamq"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/jsonutil"
"github.com/openimsdk/tools/utils/timeutil"
"github.com/redis/go-redis/v9"
"google.golang.org/protobuf/proto"
"math/rand"
"os"
"strconv"
"sync/atomic"
"time"
)

type ConsumerHandler struct {
Expand All @@ -61,7 +54,6 @@ type ConsumerHandler struct {
groupRpcClient rpcclient.GroupRpcClient
webhookClient *webhook.Client
config *Config
readCh chan *sdkws.MarkAsReadTips
}

func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher, rdb redis.UniversalClient,
Expand All @@ -84,8 +76,6 @@ func NewConsumerHandler(config *Config, offlinePusher offlinepush.OfflinePusher,
consumerHandler.webhookClient = webhook.NewWebhookClient(config.WebhooksConfig.URL)
consumerHandler.config = config
consumerHandler.onlineCache = rpccache.NewOnlineCache(userRpcClient, consumerHandler.groupLocalCache, rdb, nil)
consumerHandler.readCh = make(chan *sdkws.MarkAsReadTips, 1024*8)
go consumerHandler.loopRead()
return &consumerHandler, nil
}

Expand All @@ -99,7 +89,6 @@ func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
MsgData: msgFromMQ.MsgData,
ConversationID: msgFromMQ.ConversationID,
}
c.handlerConversationRead(ctx, pbData.MsgData)
sec := msgFromMQ.MsgData.SendTime / 1000
nowSec := timeutil.GetCurrentTimestampBySecond()
log.ZDebug(ctx, "push msg", "msg", pbData.String(), "sec", sec, "nowSec", nowSec)
Expand Down Expand Up @@ -129,98 +118,6 @@ func (*ConsumerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }

func (*ConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }

func (c *ConsumerHandler) loopRead() {
type markKey struct {
ConversationID string
UserID string
}
type markSeq struct {
ReadSeq int64
MarkSeq int64
Count int64
}
type asyncRequest struct {
ConversationID string
UserID string
ReadSeq int64
}
ctx := context.Background()
ctx = mcontext.WithOpUserIDContext(ctx, c.config.Share.IMAdminUserID[0])
opIDPrefix := fmt.Sprintf("mark_read_%d_%d_", os.Getpid(), rand.Uint32())
var incr atomic.Uint64
maxSeq := make(map[markKey]*markSeq, 1024*8)
queue := memamq.NewMemoryQueue(32, 1024)
defer queue.Stop()
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
var markSeqs []asyncRequest
for key, seq := range maxSeq {
if seq.MarkSeq >= seq.ReadSeq {
seq.Count++
if seq.Count > 6 {
delete(maxSeq, key)
}
continue
}
seq.Count = 0
seq.MarkSeq = seq.ReadSeq
markSeqs = append(markSeqs, asyncRequest{
ConversationID: key.ConversationID,
UserID: key.UserID,
ReadSeq: seq.ReadSeq,
})
}
if len(markSeqs) == 0 {
continue
}
go func() {
for i := range markSeqs {
seq := markSeqs[i]
_ = queue.PushCtx(ctx, func() {
ctx = mcontext.SetOperationID(ctx, opIDPrefix+strconv.FormatUint(incr.Add(1), 10))
_, err := c.msgRpcClient.Client.SetConversationHasReadSeq(ctx, &pbchat.SetConversationHasReadSeqReq{
ConversationID: seq.ConversationID,
UserID: seq.UserID,
HasReadSeq: seq.ReadSeq,
NoNotification: true,
})
if err != nil {
log.ZError(ctx, "ConsumerHandler SetConversationHasReadSeq", err, "conversationID", seq.ConversationID, "userID", seq.UserID, "readSeq", seq.ReadSeq)
}
})
}
}()

case tips, ok := <-c.readCh:
if !ok {
return
}
if tips.HasReadSeq <= 0 {
continue
}
key := markKey{
ConversationID: tips.ConversationID,
UserID: tips.MarkAsReadUserID,
}
ms, ok := maxSeq[key]
if ok {
if ms.ReadSeq < tips.HasReadSeq {
ms.ReadSeq = tips.HasReadSeq
}
} else {
ms = &markSeq{
ReadSeq: tips.HasReadSeq,
MarkSeq: 0,
}
maxSeq[key] = ms
}
}
}
}

func (c *ConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
ctx := c.pushConsumerGroup.GetContextFromMsg(msg)
Expand Down Expand Up @@ -318,39 +215,6 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
return result, nil
}

func (c *ConsumerHandler) handlerConversationRead(ctx context.Context, msg *sdkws.MsgData) {
if msg.ContentType != constant.HasReadReceipt {
return
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
return
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
return
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
return
}
select {
case c.readCh <- &tips:
default:
log.ZWarn(ctx, "handlerConversationRead readCh is full", nil, "markAsReadTips", &tips)
}
}

func (c *ConsumerHandler) Push2Group(ctx context.Context, groupID string, msg *sdkws.MsgData) (err error) {
log.ZDebug(ctx, "Get group msg from msg_transfer and push msg", "msg", msg.String(), "groupID", groupID)
var pushToUserIDs []string
Expand Down
12 changes: 3 additions & 9 deletions internal/rpc/msg/as_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,10 @@ func (m *msgServer) SetConversationHasReadSeq(ctx context.Context, req *msg.SetC
if req.HasReadSeq > maxSeq {
return nil, errs.ErrArgs.WrapMsg("hasReadSeq must not be bigger than maxSeq")
}
if req.NoNotification {
if err := m.MsgDatabase.SetHasReadSeqToDB(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
} else {
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
if err := m.MsgDatabase.SetHasReadSeq(ctx, req.UserID, req.ConversationID, req.HasReadSeq); err != nil {
return nil, err
}
m.sendMarkAsReadNotification(ctx, req.ConversationID, constant.SingleChatType, req.UserID, req.UserID, nil, req.HasReadSeq)
return &msg.SetConversationHasReadSeqResp{}, nil
}

Expand Down
24 changes: 11 additions & 13 deletions pkg/common/storage/cache/redis/seq_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,23 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
})
}

func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if writeDB {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
return nil
}

func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq)
}

func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
Expand Down Expand Up @@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string,
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/common/storage/cache/seq_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ type SeqUser interface {
GetUserMinSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64, writeDB bool) error
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
Expand Down
6 changes: 3 additions & 3 deletions pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (db *commonMsgDatabase) DeleteMessagesFromCache(ctx context.Context, conver

func (db *commonMsgDatabase) setHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {
for userID, seq := range userSeqMap {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq, false); err != nil {
if err := db.seqUser.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
Expand Down Expand Up @@ -806,11 +806,11 @@ func (db *commonMsgDatabase) UserSetHasReadSeqs(ctx context.Context, userID stri
}

func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, false)
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}

func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq, true)
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}

func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
Expand Down
Loading

0 comments on commit 764b47e

Please sign in to comment.