Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: read seq is written to mongo, online status redis cluster is supported #2558

Merged
merged 85 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
4c3c455
fix: GroupApplicationAcceptedNotification
withchao Jan 11, 2024
f5f2cf4
Merge remote-tracking branch 'origin/main'
withchao Jan 11, 2024
ad979d4
Merge branch 'openimsdk:main' into main
withchao Jan 11, 2024
a00d77a
fix: GroupApplicationAcceptedNotification
withchao Jan 11, 2024
48ff03f
fix: NotificationUserInfoUpdate
withchao Jan 12, 2024
45859d9
Merge branch 'openimsdk:main' into main
withchao Jan 16, 2024
e3d78de
Merge branch 'openimsdk:main' into main
withchao Jan 18, 2024
c81e199
Merge branch 'openimsdk:main' into main
withchao Jan 22, 2024
4266fed
cicd: robot automated Change
withchao Jan 22, 2024
1a14191
Merge remote-tracking branch 'upstream/main'
withchao Jan 26, 2024
86d3a66
Merge branch 'openimsdk:main' into main
withchao Jan 28, 2024
1964a4b
Merge branch 'openimsdk:main' into main
withchao Jan 31, 2024
d91cde9
Merge branch 'openimsdk:main' into main
withchao Feb 5, 2024
6a05e64
Merge branch 'openimsdk:main' into main
withchao Feb 20, 2024
39a6141
Merge branch 'openimsdk:main' into main
withchao Mar 6, 2024
c2009ce
Merge branch 'openimsdk:main' into main
withchao Mar 7, 2024
34c7f22
Merge branch 'openimsdk:main' into main
withchao Mar 8, 2024
ea9c75b
Merge branch 'openimsdk:main' into main
withchao Mar 13, 2024
7b5279a
fix: component
withchao Mar 13, 2024
34daf13
fix: getConversationInfo
withchao Mar 13, 2024
65d58ae
Merge branch 'openimsdk:main' into main
withchao Mar 13, 2024
e601b5f
Merge remote-tracking branch 'upstream/main'
withchao Mar 20, 2024
5f72b1e
Merge remote-tracking branch 'origin/main'
withchao Mar 20, 2024
20dfafd
Merge branch 'openimsdk:main' into main
withchao Apr 24, 2024
dc57d38
feat: cron task
withchao Apr 25, 2024
2f13149
feat: cron task
withchao Apr 25, 2024
2e43950
feat: cron task
withchao Apr 25, 2024
7a29a85
feat: cron task
withchao Apr 26, 2024
3920c06
feat: cron task
withchao Apr 26, 2024
bed7a60
Merge branch 'openimsdk:main' into main
withchao Apr 26, 2024
f2fbf19
Merge remote-tracking branch 'refs/remotes/origin/main' into corn37
withchao Apr 26, 2024
01851df
Merge remote-tracking branch 'upstream/main'
withchao Apr 28, 2024
480ccc7
fix: minio config url recognition error
withchao Apr 28, 2024
1046323
Merge branch 'openimsdk:main' into main
withchao May 8, 2024
76c6fe8
Merge branch 'openimsdk:main' into main
withchao May 10, 2024
6b119bd
Merge branch 'openimsdk:main' into main
withchao May 17, 2024
61740d4
Merge branch 'openimsdk:main' into main
withchao May 30, 2024
17dad5c
Merge branch 'openimsdk:main' into main
withchao Jun 12, 2024
5156795
Merge branch 'openimsdk:main' into main
withchao Jun 17, 2024
4781cf4
Merge branch 'openimsdk:main' into main
withchao Jul 1, 2024
bfbfb78
Merge branch 'openimsdk:main' into main
withchao Jul 3, 2024
8d4737c
update gomake version
withchao Jul 3, 2024
0b4c802
update gomake version
withchao Jul 3, 2024
a464750
Merge branch 'openimsdk:main' into main
withchao Jul 4, 2024
ab84d77
Merge branch 'openimsdk:main' into main
withchao Jul 5, 2024
df9bbeb
Merge branch 'openimsdk:main' into main
withchao Jul 9, 2024
fdb6ea7
Merge branch 'openimsdk:main' into main
withchao Jul 10, 2024
18cc83c
Merge branch 'openimsdk:main' into main
withchao Jul 11, 2024
33de30b
Merge branch 'openimsdk:main' into main
withchao Jul 15, 2024
71ba69b
Merge branch 'openimsdk:main' into main
withchao Jul 15, 2024
904c669
Merge branch 'openimsdk:main' into main
withchao Jul 17, 2024
6df8412
Merge branch 'openimsdk:main' into main
withchao Jul 17, 2024
2ce8d22
fix: seq conversion bug
withchao Jul 17, 2024
d521f0b
fix: redis pipe exec
withchao Jul 17, 2024
87b88e0
Merge branch 'openimsdk:main' into main
withchao Jul 17, 2024
091f9c3
Merge branch 'openimsdk:main' into main
withchao Jul 18, 2024
4d42fe9
Merge branch 'openimsdk:main' into main
withchao Jul 19, 2024
eca9abe
Merge branch 'openimsdk:main' into main
withchao Jul 22, 2024
f7bcbad
Merge branch 'openimsdk:main' into main
withchao Jul 24, 2024
0885703
Merge branch 'openimsdk:main' into main
withchao Jul 25, 2024
57b35ad
Merge branch 'openimsdk:main' into main
withchao Jul 26, 2024
ae5e433
Merge branch 'openimsdk:main' into main
withchao Jul 29, 2024
882d24b
fix: ImportFriends
withchao Jul 29, 2024
82c903a
Merge branch 'openimsdk:main' into main
withchao Jul 29, 2024
e4817d6
Merge branch 'openimsdk:main' into main
withchao Jul 31, 2024
9f1587f
Merge branch 'openimsdk:main' into main
withchao Jul 31, 2024
3b4d9f7
Merge branch 'openimsdk:main' into main
withchao Aug 1, 2024
e2bf593
Merge branch 'openimsdk:main' into main
withchao Aug 2, 2024
9e907e5
Merge branch 'openimsdk:main' into main
withchao Aug 6, 2024
31f1476
Merge branch 'openimsdk:main' into main
withchao Aug 13, 2024
0f86c4b
fix: A large number of logs keysAndValues ​​length is not even
withchao Aug 16, 2024
1fcb46b
Merge branch 'openimsdk:main' into main
withchao Aug 16, 2024
e112a4d
Merge branch 'openimsdk:main' into main
withchao Aug 19, 2024
62f58de
Merge branch 'openimsdk:main' into main
withchao Aug 21, 2024
c14dcb7
Merge branch 'openimsdk:main' into main
withchao Aug 23, 2024
6cf385b
feat: mark read aggregate write
withchao Aug 23, 2024
3b6463d
feat: online status supports redis cluster
withchao Aug 26, 2024
d2e1675
feat: online status supports redis cluster
withchao Aug 26, 2024
d0dcd9a
Merge remote-tracking branch 'upstream/main'
withchao Aug 26, 2024
770dac3
feat: online status supports redis cluster
withchao Aug 26, 2024
b909967
merge
withchao Aug 26, 2024
54a4241
Merge remote-tracking branch 'upstream/main'
withchao Aug 26, 2024
5f8713a
merge
withchao Aug 26, 2024
764b47e
read seq is written to mongo
withchao Aug 26, 2024
12bbd88
read seq is written to mongo
withchao Aug 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,7 @@ github.com/onsi/gomega v1.25.0 h1:Vw7br2PCDYijJHSfBOWhov+8cAnUf8MfMaIOV323l6Y=
github.com/onsi/gomega v1.25.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM=
github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCFsz7t7vbv7Y=
github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI=
github.com/openimsdk/protocol v0.0.72-alpha.8 h1:MhxSsdxXx2ZaeSLQk4uFftsB5L2rPh1Qup+dURQNzXQ=
github.com/openimsdk/protocol v0.0.72-alpha.8/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/protocol v0.0.72-alpha.9 h1:Dyx4vs88IU4rJ2YcP/TdYp4ww8JjsMkV89hB/Eazx+A=
github.com/openimsdk/protocol v0.0.72-alpha.9/go.mod h1:OZQA9FR55lseYoN2Ql1XAHYKHJGu7OMNkUbuekrKCM8=
github.com/openimsdk/tools v0.0.49-alpha.55 h1:KPgC53oqiwZYssLKljhtXbWXifMlTj2SSQEusj4Uf4k=
github.com/openimsdk/tools v0.0.49-alpha.55/go.mod h1:h1cYmfyaVtgFbKmb1Cfsl8XwUOMTt8ubVUQrdGtsUh4=
Expand Down
57 changes: 57 additions & 0 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package msgtransfer

import (
"context"
"encoding/json"
"errors"
"github.com/IBM/sarama"
"github.com/go-redis/redis"
Expand Down Expand Up @@ -89,6 +90,7 @@ func NewOnlineHistoryRedisConsumerHandler(kafkaConf *config.Kafka, database cont
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = historyConsumerGroup

return &och, err
}
func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID int, val *batcher.Msg[sarama.ConsumerMessage]) {
Expand All @@ -97,6 +99,7 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
ctx = withAggregationCtx(ctx, ctxMessages)
log.ZInfo(ctx, "msg arrived channel", "channel id", channelID, "msgList length", len(ctxMessages),
"key", val.Key())
och.doSetReadSeq(ctx, ctxMessages)

storageMsgList, notStorageMsgList, storageNotificationList, notStorageNotificationList :=
och.categorizeMessageLists(ctxMessages)
Expand All @@ -110,6 +113,60 @@ func (och *OnlineHistoryRedisConsumerHandler) do(ctx context.Context, channelID
och.handleNotification(ctx, val.Key(), conversationIDNotification, storageNotificationList, notStorageNotificationList)
}

func (och *OnlineHistoryRedisConsumerHandler) doSetReadSeq(ctx context.Context, msgs []*ContextMsg) {
type seqKey struct {
conversationID string
userID string
}
var readSeq map[seqKey]int64
for _, msg := range msgs {
if msg.message.ContentType != constant.HasReadReceipt {
continue
}
var elem sdkws.NotificationElem
if err := json.Unmarshal(msg.message.Content, &elem); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal NotificationElem msg err", err, "msg", msg)
continue
}
var tips sdkws.MarkAsReadTips
if err := json.Unmarshal([]byte(elem.Detail), &tips); err != nil {
log.ZError(ctx, "handlerConversationRead Unmarshal MarkAsReadTips msg err", err, "msg", msg)
continue
}
if len(tips.Seqs) > 0 {
for _, seq := range tips.Seqs {
if tips.HasReadSeq < seq {
tips.HasReadSeq = seq
}
}
clear(tips.Seqs)
tips.Seqs = nil
}
if tips.HasReadSeq < 0 {
continue
}
if readSeq == nil {
readSeq = make(map[seqKey]int64)
}
key := seqKey{
conversationID: tips.ConversationID,
userID: tips.MarkAsReadUserID,
}
if readSeq[key] > tips.HasReadSeq {
continue
}
readSeq[key] = tips.HasReadSeq
}
if readSeq == nil {
return
}
for key, seq := range readSeq {
if err := och.msgDatabase.SetHasReadSeqToDB(ctx, key.userID, key.conversationID, seq); err != nil {
log.ZError(ctx, "set read seq to db error", err, "userID", key.userID, "conversationID", key.conversationID, "seq", seq)
}
}
}

func (och *OnlineHistoryRedisConsumerHandler) parseConsumerMessages(ctx context.Context, consumerMessages []*sarama.ConsumerMessage) []*ContextMsg {
var ctxMessages []*ContextMsg
for i := 0; i < len(consumerMessages); i++ {
Expand Down
1 change: 0 additions & 1 deletion internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package msgtransfer

import (
"context"

"github.com/IBM/sarama"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
Expand Down
29 changes: 29 additions & 0 deletions internal/push/a_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package push

import (
"github.com/openimsdk/protocol/sdkws"
"testing"
)

func TestName(t *testing.T) {
var c ConsumerHandler
c.readCh = make(chan *sdkws.MarkAsReadTips)

go c.loopRead()

go func() {
for i := 0; ; i++ {
seq := int64(i + 1)
if seq%3 == 0 {
seq = 1
}
c.readCh <- &sdkws.MarkAsReadTips{
ConversationID: "c100",
MarkAsReadUserID: "u100",
HasReadSeq: seq,
}
}
}()

select {}
}
1 change: 1 addition & 0 deletions pkg/apistruct/msg_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package apistruct
26 changes: 19 additions & 7 deletions pkg/common/storage/cache/redis/online.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -66,11 +67,10 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
local change = (num1 ~= num2) or (num2 ~= num3)
if change then
local members = redis.call("ZRANGE", key, 0, -1)
table.insert(members, KEYS[2])
redis.call("PUBLISH", KEYS[3], table.concat(members, ":"))
return 1
table.insert(members, "1")
return members
else
return 0
return {"0"}
end
`
now := time.Now()
Expand All @@ -82,12 +82,24 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
for _, platformID := range online {
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
keys := []string{s.getUserOnlineKey(userID)}
platformIDs, err := s.rdb.Eval(ctx, script, keys, argv).StringSlice()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
if len(platformIDs) == 0 {
return errs.ErrInternalServer.WrapMsg("SetUserOnline redis lua invalid return value")
}
if platformIDs[len(platformIDs)-1] != "0" {
log.ZDebug(ctx, "redis SetUserOnline push", "userID", userID, "online", online, "offline", offline, "platformIDs", platformIDs[:len(platformIDs)-1])
platformIDs[len(platformIDs)-1] = userID
msg := strings.Join(platformIDs, ":")
if err := s.rdb.Publish(ctx, s.channelName, msg).Err(); err != nil {
return errs.Wrap(err)
}
} else {
log.ZDebug(ctx, "redis SetUserOnline not push", "userID", userID, "online", online, "offline", offline)
}
return nil
}
51 changes: 51 additions & 0 deletions pkg/common/storage/cache/redis/online_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package redis

import (
"context"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/db/redisutil"
"testing"
"time"
)

/*
address: [ 172.16.8.48:7001, 172.16.8.48:7002, 172.16.8.48:7003, 172.16.8.48:7004, 172.16.8.48:7005, 172.16.8.48:7006 ]
username:
password: passwd123
clusterMode: true
db: 0
maxRetry: 10
*/
func TestName111111(t *testing.T) {
conf := config.Redis{
Address: []string{
"172.16.8.124:7001",
"172.16.8.124:7002",
"172.16.8.124:7003",
"172.16.8.124:7004",
"172.16.8.124:7005",
"172.16.8.124:7006",
},
ClusterMode: true,
Password: "passwd123",
//Address: []string{"localhost:16379"},
//Password: "openIM123",
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1000)
defer cancel()
rdb, err := redisutil.NewRedisClient(ctx, conf.Build())
if err != nil {
panic(err)
}
online := NewUserOnline(rdb)

userID := "a123456"
t.Log(online.GetOnline(ctx, userID))
t.Log(online.SetUserOnline(ctx, userID, []int32{1, 2, 3, 4}, nil))
t.Log(online.GetOnline(ctx, userID))

}

func TestName111(t *testing.T) {

}
24 changes: 11 additions & 13 deletions pkg/common/storage/cache/redis/seq_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,22 @@ func (s *seqUserCacheRedis) GetUserReadSeq(ctx context.Context, conversationID s
}

func (s *seqUserCacheRedis) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
if dbSeq < seq {
if err := s.rocks.RawSet(ctx, s.getSeqUserReadSeqKey(conversationID, userID), strconv.Itoa(int(seq)), s.readExpireTime); err != nil {
return errs.Wrap(err)
}
}
return nil
}

func (s *seqUserCacheRedis) SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error {
return s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq)
}

func (s *seqUserCacheRedis) SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error {
keys := make([]string, 0, len(seqs))
for conversationID, seq := range seqs {
Expand Down Expand Up @@ -128,13 +133,6 @@ func (s *seqUserCacheRedis) SetUserReadSeqs(ctx context.Context, userID string,
if err := s.setUserRedisReadSeqs(ctx, userID, seqs); err != nil {
return err
}
for conversationID, seq := range seqs {
if seq%s.readSeqWriteRatio == 0 {
if err := s.mgo.SetUserReadSeq(ctx, conversationID, userID, seq); err != nil {
return err
}
}
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions pkg/common/storage/cache/seq_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type SeqUser interface {
SetUserMinSeq(ctx context.Context, conversationID string, userID string, seq int64) error
GetUserReadSeq(ctx context.Context, conversationID string, userID string) (int64, error)
SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserReadSeqToDB(ctx context.Context, conversationID string, userID string, seq int64) error
SetUserMinSeqs(ctx context.Context, userID string, seqs map[string]int64) error
SetUserReadSeqs(ctx context.Context, userID string, seqs map[string]int64) error
GetUserReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
Expand Down
5 changes: 5 additions & 0 deletions pkg/common/storage/controller/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type CommonMsgDatabase interface {

SetUserConversationsMinSeqs(ctx context.Context, userID string, seqs map[string]int64) (err error)
SetHasReadSeq(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error
GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error)
GetHasReadSeq(ctx context.Context, userID string, conversationID string) (int64, error)
UserSetHasReadSeqs(ctx context.Context, userID string, hasReadSeqs map[string]int64) error
Expand Down Expand Up @@ -808,6 +809,10 @@ func (db *commonMsgDatabase) SetHasReadSeq(ctx context.Context, userID string, c
return db.seqUser.SetUserReadSeq(ctx, conversationID, userID, hasReadSeq)
}

func (db *commonMsgDatabase) SetHasReadSeqToDB(ctx context.Context, userID string, conversationID string, hasReadSeq int64) error {
return db.seqUser.SetUserReadSeqToDB(ctx, conversationID, userID, hasReadSeq)
}

func (db *commonMsgDatabase) GetHasReadSeqs(ctx context.Context, userID string, conversationIDs []string) (map[string]int64, error) {
return db.seqUser.GetUserReadSeqs(ctx, userID, conversationIDs)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/common/storage/database/mgo/seq_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,12 @@ func (s *seqUserMongo) GetUserReadSeqs(ctx context.Context, userID string, conve
}

func (s *seqUserMongo) SetUserReadSeq(ctx context.Context, conversationID string, userID string, seq int64) error {
dbSeq, err := s.GetUserReadSeq(ctx, conversationID, userID)
if err != nil {
return err
}
if dbSeq > seq {
return nil
}
return s.setSeq(ctx, conversationID, userID, seq, "read_seq")
}
Loading