From 8b79a7685c16f18ace4099c7c0c430f6fa6deb53 Mon Sep 17 00:00:00 2001 From: chao <48119764+withchao@users.noreply.github.com> Date: Thu, 26 Dec 2024 17:49:05 +0800 Subject: [PATCH 1/3] feat: support message cache (#3007) * pb * fix: Modifying other fields while setting IsPrivateChat does not take effect * fix: quote message error revoke * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * refactoring scheduled tasks * upgrading pkg tools * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache * redis msg cache --- internal/msgtransfer/init.go | 2 +- .../online_msg_to_mongo_handler.go | 20 +- internal/rpc/msg/delete.go | 6 +- internal/rpc/msg/revoke.go | 5 +- internal/rpc/msg/server.go | 2 +- pkg/common/storage/cache/cachekey/msg.go | 42 +- pkg/common/storage/cache/msg.go | 19 +- pkg/common/storage/cache/redis/msg.go | 180 ++----- pkg/common/storage/cache/redis/msg_test.go | 133 ----- pkg/common/storage/controller/msg.go | 458 +++++------------- pkg/common/storage/controller/msg_transfer.go | 35 +- pkg/common/storage/database/mgo/msg.go | 422 ++-------------- pkg/common/storage/database/msg.go | 12 +- pkg/common/storage/model/msg.go | 4 + 14 files changed, 277 insertions(+), 1063 deletions(-) delete mode 100644 pkg/common/storage/cache/redis/msg_test.go diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index c4a9a89c62..fcd6152dc1 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -108,11 +108,11 @@ func Start(ctx context.Context, index int, config *Config) error { cm.Watch(ctx) } - msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } + msgModel := redis.NewMsgCache(rdb, msgDocModel) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) if err != nil { return err diff --git a/internal/msgtransfer/online_msg_to_mongo_handler.go b/internal/msgtransfer/online_msg_to_mongo_handler.go index 82002c26b9..d8836d54eb 100644 --- a/internal/msgtransfer/online_msg_to_mongo_handler.go +++ b/internal/msgtransfer/online_msg_to_mongo_handler.go @@ -77,27 +77,13 @@ func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(ctx context.Cont for _, msg := range msgFromMQ.MsgData { seqs = append(seqs, msg.Seq) } - err = mc.msgTransferDatabase.DeleteMessagesFromCache(ctx, msgFromMQ.ConversationID, seqs) - if err != nil { - log.ZError( - ctx, - "remove cache msg from redis err", - err, - "msg", - msgFromMQ.MsgData, - "conversationID", - msgFromMQ.ConversationID, - ) - } } -func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } +func (*OnlineHistoryMongoConsumerHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } + func (*OnlineHistoryMongoConsumerHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } -func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim( - sess sarama.ConsumerGroupSession, - claim sarama.ConsumerGroupClaim, -) error { // an instance in the consumer group +func (mc *OnlineHistoryMongoConsumerHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // an instance in the consumer group log.ZDebug(context.Background(), "online new session msg come", "highWaterMarkOffset", claim.HighWaterMarkOffset(), "topic", claim.Topic(), "partition", claim.Partition()) for msg := range claim.Messages() { diff --git a/internal/rpc/msg/delete.go b/internal/rpc/msg/delete.go index f45713ff12..d3485faaa8 100644 --- a/internal/rpc/msg/delete.go +++ b/internal/rpc/msg/delete.go @@ -106,10 +106,8 @@ func (m *msgServer) DeleteMsgPhysical(ctx context.Context, req *msg.DeleteMsgPhy return nil, err } remainTime := timeutil.GetCurrentTimestampBySecond() - req.Timestamp - for _, conversationID := range req.ConversationIDs { - if err := m.MsgDatabase.DeleteConversationMsgsAndSetMinSeq(ctx, conversationID, remainTime); err != nil { - log.ZWarn(ctx, "DeleteConversationMsgsAndSetMinSeq error", err, "conversationID", conversationID, "err", err) - } + if _, err := m.DestructMsgs(ctx, &msg.DestructMsgsReq{Timestamp: remainTime, Limit: 9999}); err != nil { + return nil, err } return &msg.DeleteMsgPhysicalResp{}, nil } diff --git a/internal/rpc/msg/revoke.go b/internal/rpc/msg/revoke.go index b7cc7df62e..97de0f48a7 100644 --- a/internal/rpc/msg/revoke.go +++ b/internal/rpc/msg/revoke.go @@ -63,7 +63,8 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. log.ZDebug(ctx, "GetMsgBySeqs", "conversationID", req.ConversationID, "seq", req.Seq, "msg", string(data)) var role int32 if !authverify.IsAppManagerUid(ctx, m.config.Share.IMAdminUserID) { - switch msgs[0].SessionType { + sessionType := msgs[0].SessionType + switch sessionType { case constant.SingleChatType: if err := authverify.CheckAccessV3(ctx, msgs[0].SendID, m.config.Share.IMAdminUserID); err != nil { return nil, err @@ -89,7 +90,7 @@ func (m *msgServer) RevokeMsg(ctx context.Context, req *msg.RevokeMsgReq) (*msg. role = member.RoleLevel } default: - return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported") + return nil, errs.ErrInternalServer.WrapMsg("msg sessionType not supported", "sessionType", sessionType) } } now := time.Now().UnixMilli() diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 991fb87044..7ccda6bd57 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - msgModel := redis.NewMsgCache(rdb) + msgModel := redis.NewMsgCache(rdb, msgDocModel) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) if err != nil { return err diff --git a/pkg/common/storage/cache/cachekey/msg.go b/pkg/common/storage/cache/cachekey/msg.go index 8e05b64f1f..ac449df38e 100644 --- a/pkg/common/storage/cache/cachekey/msg.go +++ b/pkg/common/storage/cache/cachekey/msg.go @@ -15,50 +15,16 @@ package cachekey import ( - "github.com/openimsdk/protocol/constant" "strconv" ) const ( - messageCache = "MESSAGE_CACHE:" - messageDelUserList = "MESSAGE_DEL_USER_LIST:" - userDelMessagesList = "USER_DEL_MESSAGES_LIST:" - sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - exTypeKeyLocker = "EX_LOCK:" - reactionExSingle = "EX_SINGLE_" - reactionWriteGroup = "EX_GROUP_" - reactionReadGroup = "EX_SUPER_GROUP_" - reactionNotification = "EX_NOTIFICATION_" + sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" + messageCache = "MSG_CACHE:" ) -func GetMessageCacheKey(conversationID string, seq int64) string { - return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) -} - -func GetMessageDelUserListKey(conversationID string, seq int64) string { - return messageDelUserList + conversationID + ":" + strconv.Itoa(int(seq)) -} - -func GetUserDelListKey(conversationID, userID string) string { - return userDelMessagesList + conversationID + ":" + userID -} - -func GetMessageReactionExKey(clientMsgID string, sessionType int32) string { - switch sessionType { - case constant.SingleChatType: - return reactionExSingle + clientMsgID - case constant.WriteGroupChatType: - return reactionWriteGroup + clientMsgID - case constant.ReadGroupChatType: - return reactionReadGroup + clientMsgID - case constant.NotificationChatType: - return reactionNotification + clientMsgID - } - - return "" -} -func GetLockMessageTypeKey(clientMsgID string, TypeKey string) string { - return exTypeKeyLocker + clientMsgID + "_" + TypeKey +func GetMsgCacheKey(conversationID string, seq int64) string { + return messageCache + conversationID + ":" + strconv.Itoa(int(seq)) } func GetSendMsgKey(id string) string { diff --git a/pkg/common/storage/cache/msg.go b/pkg/common/storage/cache/msg.go index 00eb28c02e..271ed19fe3 100644 --- a/pkg/common/storage/cache/msg.go +++ b/pkg/common/storage/cache/msg.go @@ -16,23 +16,14 @@ package cache import ( "context" - "time" - - "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" ) type MsgCache interface { - GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) - DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) - JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) - GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) - DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error - SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) - GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) - SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error - LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error - UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error + + GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) + DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error + SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error } diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index b04bc5c357..0651f02830 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -2,10 +2,12 @@ package redis import ( "context" + "encoding/json" + "github.com/dtm-labs/rockscache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/protocol/sdkws" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" + "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" @@ -13,76 +15,26 @@ import ( ) // // msgCacheTimeout is expiration time of message cache, 86400 seconds -const msgCacheTimeout = 86400 +const msgCacheTimeout = time.Hour * 24 -func NewMsgCache(client redis.UniversalClient) cache.MsgCache { - return &msgCache{rdb: client} +func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache { + return &msgCache{ + rdb: client, + rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()), + msgDocDatabase: db, + } } type msgCache struct { - rdb redis.UniversalClient -} - -func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { - return cachekey.GetMessageCacheKey(conversationID, seq) -} -func (c *msgCache) getMessageDelUserListKey(conversationID string, seq int64) string { - return cachekey.GetMessageDelUserListKey(conversationID, seq) -} - -func (c *msgCache) getUserDelList(conversationID, userID string) string { - return cachekey.GetUserDelListKey(conversationID, userID) + rdb redis.UniversalClient + rcClient *rockscache.Client + msgDocDatabase database.Msg } func (c *msgCache) getSendMsgKey(id string) string { return cachekey.GetSendMsgKey(id) } -func (c *msgCache) getLockMessageTypeKey(clientMsgID string, TypeKey string) string { - return cachekey.GetLockMessageTypeKey(clientMsgID, TypeKey) -} - -func (c *msgCache) getMessageReactionExPrefix(clientMsgID string, sessionType int32) string { - return cachekey.GetMessageReactionExKey(clientMsgID, sessionType) -} - -func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string { - return c.getMessageCacheKey(conversationID, msg.Seq) - }) - keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string { - return c.getMessageCacheKey(conversationID, msg.Seq) - }) - err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - var values []string - for _, key := range keys { - if msg, ok := msgMap[key]; ok { - s, err := msgprocessor.Pb2String(msg) - if err != nil { - return err - } - values = append(values, s) - } - } - return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, msgCacheTimeout) - }) - if err != nil { - return 0, err - } - return len(msgs), nil -} - -func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - var keys []string - for _, seq := range seqs { - keys = append(keys, c.getMessageCacheKey(conversationID, seq)) - } - - return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - return LuaDeleteBatch(ctx, c.rdb, keys) - }) -} - func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) } @@ -92,81 +44,53 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro return int32(result), errs.Wrap(err) } -func (c *msgCache) LockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := c.getLockMessageTypeKey(clientMsgID, TypeKey) - return errs.Wrap(c.rdb.SetNX(ctx, key, 1, time.Minute).Err()) -} - -func (c *msgCache) UnLockMessageTypeKey(ctx context.Context, clientMsgID string, TypeKey string) error { - key := c.getLockMessageTypeKey(clientMsgID, TypeKey) - return errs.Wrap(c.rdb.Del(ctx, key).Err()) +func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { + if len(seqs) == 0 { + return nil, nil + } + getKey := func(seq int64) string { + return cachekey.GetMsgCacheKey(conversationID, seq) + } + getMsgID := func(msg *model.MsgInfoModel) int64 { + return msg.Msg.Seq + } + find := func(ctx context.Context, seqs []int64) ([]*model.MsgInfoModel, error) { + return c.msgDocDatabase.FindSeqs(ctx, conversationID, seqs) + } + return batchGetCache2(ctx, c.rcClient, msgCacheTimeout, seqs, getKey, getMsgID, find) } -func (c *msgCache) JudgeMessageReactionExist(ctx context.Context, clientMsgID string, sessionType int32) (bool, error) { - n, err := c.rdb.Exists(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() +func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) error { + if len(seqs) == 0 { + return nil + } + keys := datautil.Slice(seqs, func(seq int64) string { + return cachekey.GetMsgCacheKey(conversationID, seq) + }) + slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) if err != nil { - return false, errs.Wrap(err) + return err } - - return n > 0, nil -} - -func (c *msgCache) SetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey, value string) error { - return errs.Wrap(c.rdb.HSet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey, value).Err()) -} - -func (c *msgCache) SetMessageReactionExpire(ctx context.Context, clientMsgID string, sessionType int32, expiration time.Duration) (bool, error) { - val, err := c.rdb.Expire(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), expiration).Result() - return val, errs.Wrap(err) -} - -func (c *msgCache) GetMessageTypeKeyValue(ctx context.Context, clientMsgID string, sessionType int32, typeKey string) (string, error) { - val, err := c.rdb.HGet(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), typeKey).Result() - return val, errs.Wrap(err) -} - -func (c *msgCache) GetOneMessageAllReactionList(ctx context.Context, clientMsgID string, sessionType int32) (map[string]string, error) { - val, err := c.rdb.HGetAll(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType)).Result() - return val, errs.Wrap(err) -} - -func (c *msgCache) DeleteOneMessageKey(ctx context.Context, clientMsgID string, sessionType int32, subKey string) error { - return errs.Wrap(c.rdb.HDel(ctx, c.getMessageReactionExPrefix(clientMsgID, sessionType), subKey).Err()) + for _, keys := range slotKeys { + if err := c.rcClient.TagAsDeletedBatch2(ctx, keys); err != nil { + return err + } + } + return nil } -func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - var keys []string - keySeqMap := make(map[string]int64, 10) - for _, seq := range seqs { - key := c.getMessageCacheKey(conversationID, seq) - keys = append(keys, key) - keySeqMap[key] = seq - } - err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - result, err := LuaGetBatch(ctx, c.rdb, keys) +func (c *msgCache) SetMessageBySeqs(ctx context.Context, conversationID string, msgs []*model.MsgInfoModel) error { + for _, msg := range msgs { + if msg == nil || msg.Msg == nil || msg.Msg.Seq <= 0 { + continue + } + data, err := json.Marshal(msg) if err != nil { return err } - for i, value := range result { - seq := keySeqMap[keys[i]] - if value == nil { - failedSeqs = append(failedSeqs, seq) - continue - } - - msg := &sdkws.MsgData{} - msgString, ok := value.(string) - if !ok || msgprocessor.String2Pb(msgString, msg) != nil { - failedSeqs = append(failedSeqs, seq) - continue - } - seqMsgs = append(seqMsgs, msg) - + if err := c.rcClient.RawSet(ctx, cachekey.GetMsgCacheKey(conversationID, msg.Msg.Seq), string(data), msgCacheTimeout); err != nil { + return err } - return nil - }) - if err != nil { - return nil, nil, err } - return seqMsgs, failedSeqs, nil + return nil } diff --git a/pkg/common/storage/cache/redis/msg_test.go b/pkg/common/storage/cache/redis/msg_test.go deleted file mode 100644 index 10b9ce18b0..0000000000 --- a/pkg/common/storage/cache/redis/msg_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package redis - -import ( - "context" - "fmt" - "github.com/openimsdk/protocol/sdkws" - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" - "testing" -) - -func Test_msgCache_SetMessagesToCache(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - msgs []*sdkws.MsgData - } - tests := []struct { - name string - fields fields - args args - want int - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Username: "", Password: "openIM123", DB: 0})}, args{context.Background(), - "cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs) - if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) { - return - } - assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs) - }) - } -} - -func Test_msgCache_GetMessagesBySeq(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - seqs []int64 - } - var failedSeq []int64 - tests := []struct { - name string - fields fields - args args - wantSeqMsgs []*sdkws.MsgData - wantFailedSeqs []int64 - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})}, - args{context.Background(), "cid", []int64{1, 2, 3}}, - []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, failedSeq, assert.NoError}, - {"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})}, - args{context.Background(), "cid", []int64{4, 5, 6}}, - nil, []int64{4, 5, 6}, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs) - if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) { - return - } - equalMsgDataSlices(t, tt.wantSeqMsgs, gotSeqMsgs) - assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs) - }) - } -} - -func equalMsgDataSlices(t *testing.T, expected, actual []*sdkws.MsgData) { - assert.Equal(t, len(expected), len(actual), "Slices have different lengths") - for i := range expected { - assert.True(t, proto.Equal(expected[i], actual[i]), "Element %d not equal: expected %v, got %v", i, expected[i], actual[i]) - } -} - -func Test_msgCache_DeleteMessagesFromCache(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - seqs []int64 - } - tests := []struct { - name string - fields fields - args args - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123"})}, - args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs), - fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) - }) - } -} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index c29544c333..d5ad12584c 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -18,6 +18,9 @@ import ( "context" "encoding/json" "errors" + "github.com/openimsdk/tools/utils/jsonutil" + "strconv" + "strings" "time" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" @@ -36,7 +39,6 @@ import ( "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mq/kafka" "github.com/openimsdk/tools/utils/datautil" - "github.com/openimsdk/tools/utils/timeutil" ) const ( @@ -54,12 +56,8 @@ type CommonMsgDatabase interface { GetMsgBySeqsRange(ctx context.Context, userID string, conversationID string, begin, end, num, userMaxSeq int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) // GetMsgBySeqs retrieves messages for large groups from MongoDB by sequence numbers. GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) - // DeleteConversationMsgsAndSetMinSeq deletes conversation messages and resets the minimum sequence number. If `remainTime` is 0, all messages are deleted (this method does not delete Redis - // cache). + GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) - DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error - // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. - ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. @@ -80,8 +78,6 @@ type CommonMsgDatabase interface { GetMaxSeqWithTime(ctx context.Context, conversationID string) (database.SeqTime, error) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) - //GetMongoMaxAndMinSeq(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo int64, err error) - //GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) SearchMessage(ctx context.Context, req *pbmsg.SearchMessageReq) (total int64, msgData []*pbmsg.SearchedMsgData, err error) @@ -92,10 +88,6 @@ type CommonMsgDatabase interface { RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) - ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) - - // get Msg when destruct msg before - //DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) @@ -118,7 +110,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser } return &commonMsgDatabase{ msgDocDatabase: msgDocModel, - msg: msg, + msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, @@ -128,7 +120,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel - msg cache.MsgCache + msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producer *kafka.Producer @@ -139,7 +131,7 @@ func (db *commonMsgDatabase) MsgToMQ(ctx context.Context, key string, msg2mq *sd return err } -func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { +func (db *commonMsgDatabase) batchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { if len(fields) == 0 { return nil } @@ -237,11 +229,15 @@ func (db *commonMsgDatabase) BatchInsertBlock(ctx context.Context, conversationI tryUpdate = false // The current block is inserted successfully, and the next block is inserted preferentially i += insert - 1 // Skip the inserted data } + return nil } func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID string, seq int64, revoke *model.RevokeModel) error { - return db.BatchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq) + if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil { + return err + } + return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq}) } func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { @@ -256,23 +252,17 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI return err } } - return nil + return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs) } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { - for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { - // log.ZDebug(ctx, "getMsgBySeqs", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) - if err != nil { - return nil, err - } - for _, msg := range msgs { - totalMsgs = append(totalMsgs, convert.MsgDB2Pb(msg.Msg)) - } - } - return totalMsgs, nil + return db.GetMessageBySeqs(ctx, conversationID, userID, seqs) } + func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][]*model.MsgInfoModel, userID, conversationID string, msg *model.MsgInfoModel) { + if msg == nil || msg.Msg == nil { + return + } if msg.IsRead { msg.Msg.IsRead = true } @@ -360,9 +350,6 @@ func (db *commonMsgDatabase) handlerDBMsg(ctx context.Context, cache map[int64][ return } msg.Msg.Content = string(data) - //if _, err := db.msgDocDatabase.UpdateMsg(ctx, db.msgTable.GetDocID(conversationID, msg.Msg.Seq), db.msgTable.GetMsgIndex(msg.Msg.Seq), "msg", msg.Msg); err != nil { - // log.ZError(ctx, "UpdateMsgContent", err) - //} } func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID string, conversationID string, seqs []int64) (totalMsgs []*model.MsgInfoModel, err error) { @@ -377,24 +364,6 @@ func (db *commonMsgDatabase) findMsgInfoBySeq(ctx context.Context, userID, docID return msgs, err } -func (db *commonMsgDatabase) getMsgBySeqsRange(ctx context.Context, userID string, conversationID string, allSeqs []int64, begin, end int64) (seqMsgs []*sdkws.MsgData, err error) { - log.ZDebug(ctx, "getMsgBySeqsRange", "conversationID", conversationID, "allSeqs", allSeqs, "begin", begin, "end", end) - for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { - log.ZDebug(ctx, "getMsgBySeqsRange", "docID", docID, "seqs", seqs) - msgs, err := db.findMsgInfoBySeq(ctx, userID, docID, conversationID, seqs) - if err != nil { - return nil, err - } - for _, msg := range msgs { - if msg.IsRead { - msg.Msg.IsRead = true - } - seqMsgs = append(seqMsgs, convert.MsgDB2Pb(msg.Msg)) - } - } - return seqMsgs, nil -} - // GetMsgBySeqsRange In the context of group chat, we have the following parameters: // // "maxSeq" of a conversation: It represents the maximum value of messages in the group conversation. @@ -463,37 +432,10 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin seqs = append(seqs, i) } } - - if len(seqs) == 0 { - return 0, 0, nil, nil - } - newBegin := seqs[0] - newEnd := seqs[len(seqs)-1] - var successMsgs []*sdkws.MsgData - log.ZDebug(ctx, "GetMsgBySeqsRange", "first seqs", seqs, "newBegin", newBegin, "newEnd", newEnd) - cachedMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, seqs) - if err != nil && !errors.Is(err, redis.Nil) { - log.ZError(ctx, "get message from redis exception", err, "conversationID", conversationID, "seqs", seqs) - } - successMsgs = append(successMsgs, cachedMsgs...) - log.ZDebug(ctx, "get msgs from cache", "cachedMsgs", cachedMsgs) - // get from cache or db - - if len(failedSeqs) > 0 { - log.ZDebug(ctx, "msgs not exist in redis", "seqs", failedSeqs) - mongoMsgs, err := db.getMsgBySeqsRange(ctx, userID, conversationID, failedSeqs, begin, end) - if err != nil { - - return 0, 0, nil, err - } - successMsgs = append(mongoMsgs, successMsgs...) - - //_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) - //if err != nil { - // return 0, 0, nil, err - //} + successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, seqs) + if err != nil { + return 0, 0, nil, err } - return minSeq, maxSeq, successMsgs, nil } @@ -529,31 +471,9 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co newSeqs = append(newSeqs, seq) } } - if len(newSeqs) == 0 { - return minSeq, maxSeq, nil, nil - } - successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) + successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) if err != nil { - if !errors.Is(err, redis.Nil) { - log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) - } - } - log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", - seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) - - if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) - if err != nil { - - return 0, 0, nil, err - } - - successMsgs = append(successMsgs, mongoMsgs...) - - //_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) - //if err != nil { - // return 0, 0, nil, err - //} + return 0, 0, nil, err } return minSeq, maxSeq, successMsgs, nil } @@ -607,174 +527,14 @@ func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, use if len(newSeqs) == 0 { return isEnd, endSeq, nil, nil } - successMsgs, failedSeqs, err := db.msg.GetMessagesBySeq(ctx, conversationID, newSeqs) + successMsgs, err := db.GetMessageBySeqs(ctx, conversationID, userID, newSeqs) if err != nil { - if !errors.Is(err, redis.Nil) { - log.ZWarn(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID) - } - } - log.ZDebug(ctx, "db.seq.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", - seqs, "len(successMsgs)", len(successMsgs), "failedSeqs", failedSeqs) - - if len(failedSeqs) > 0 { - mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs) - if err != nil { - - return false, 0, nil, err - } - - successMsgs = append(successMsgs, mongoMsgs...) - - //_, err = db.msg.SetMessagesToCache(ctx, conversationID, mongoMsgs) - //if err != nil { - // return 0, 0, nil, err - //} + return false, 0, nil, err } return isEnd, endSeq, successMsgs, nil } -func (db *commonMsgDatabase) DeleteConversationMsgsAndSetMinSeq(ctx context.Context, conversationID string, remainTime int64) error { - var delStruct delMsgRecursionStruct - var skip int64 - minSeq, err := db.deleteMsgRecursion(ctx, conversationID, skip, &delStruct, remainTime) - if err != nil { - return err - } - log.ZDebug(ctx, "DeleteConversationMsgsAndSetMinSeq", "conversationID", conversationID, "minSeq", minSeq) - if minSeq == 0 { - return nil - } - return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) -} - -func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { - var index int64 - for { - // from oldest 2 newest, ASC - msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) - if err != nil || msgDocModel.DocID == "" { - if err != nil { - if err == model.ErrMsgListNotExist { - log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index) - } else { - log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) - } - } - // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion - break - } - - index++ - - // && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli() - if len(msgDocModel.Msg) > 0 { - i := 0 - var over bool - for _, msg := range msgDocModel.Msg { - i++ - // over clear time, need to clear - if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() { - // if msg is not in del list, add to del list - if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { - seqs = append(seqs, msg.Msg.Seq) - } - } else { - log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) - over = true - break - } - } - if over { - break - } - } - } - - log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs) - - // have msg need to destruct - if len(seqs) > 0 { - // update min seq to clear after - userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after - currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before - if err != nil { - return nil, err - } - - // if before < after, update min seq - if currentUserMinSeq < userMinSeq { - if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { - return nil, err - } - } - } - return seqs, nil -} - -// this is struct for recursion. -type delMsgRecursionStruct struct { - minSeq int64 - delDocIDs []string -} - -func (d *delMsgRecursionStruct) getSetMinSeq() int64 { - return d.minSeq -} - -// index 0....19(del) 20...69 -// seq 70 -// set minSeq 21 -// recursion deletes the list and returns the set minimum seq. -func (db *commonMsgDatabase) deleteMsgRecursion(ctx context.Context, conversationID string, index int64, delStruct *delMsgRecursionStruct, remainTime int64) (int64, error) { - // find from oldest list - msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) - if err != nil || msgDocModel.DocID == "" { - if err != nil { - if err == model.ErrMsgListNotExist { - log.ZDebug(ctx, "deleteMsgRecursion ErrMsgListNotExist", "conversationID", conversationID, "index:", index) - } else { - log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) - } - } - // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion - err = db.msgDocDatabase.DeleteDocs(ctx, delStruct.delDocIDs) - if err != nil { - return 0, err - } - return delStruct.getSetMinSeq() + 1, nil - } - log.ZDebug(ctx, "doc info", "conversationID", conversationID, "index", index, "docID", msgDocModel.DocID, "len", len(msgDocModel.Msg)) - if int64(len(msgDocModel.Msg)) > db.msgTable.GetSingleGocMsgNum() { - log.ZWarn(ctx, "msgs too large", nil, "length", len(msgDocModel.Msg), "docID:", msgDocModel.DocID) - } - if msgDocModel.IsFull() && msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.SendTime+(remainTime*1000) < timeutil.GetCurrentTimestampByMill() { - log.ZDebug(ctx, "doc is full and all msg is expired", "docID", msgDocModel.DocID) - delStruct.delDocIDs = append(delStruct.delDocIDs, msgDocModel.DocID) - delStruct.minSeq = msgDocModel.Msg[len(msgDocModel.Msg)-1].Msg.Seq - } else { - var delMsgIndexs []int - for i, MsgInfoModel := range msgDocModel.Msg { - if MsgInfoModel != nil && MsgInfoModel.Msg != nil { - if timeutil.GetCurrentTimestampByMill() > MsgInfoModel.Msg.SendTime+(remainTime*1000) { - delMsgIndexs = append(delMsgIndexs, i) - } - } - } - if len(delMsgIndexs) > 0 { - if err = db.msgDocDatabase.DeleteMsgsInOneDocByIndex(ctx, msgDocModel.DocID, delMsgIndexs); err != nil { - log.ZError(ctx, "deleteMsgRecursion DeleteMsgsInOneDocByIndex failed", err, "conversationID", conversationID, "index", index) - } - delStruct.minSeq = int64(msgDocModel.Msg[delMsgIndexs[len(delMsgIndexs)-1]].Msg.Seq) - } - } - seq, err := db.deleteMsgRecursion(ctx, conversationID, index+1, delStruct, remainTime) - return seq, err -} - func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { - if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { - return err - } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { var indexes []int for _, seq := range seqs { @@ -784,13 +544,10 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve return err } } - return nil + return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs) } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { - if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil { - return err - } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { for _, seq := range seqs { if _, err := db.msgDocDatabase.PushUnique(ctx, docID, db.msgTable.GetMsgIndex(seq), "del_list", []string{userID}); err != nil { @@ -798,7 +555,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st } } } - return nil + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { @@ -809,11 +566,6 @@ func (db *commonMsgDatabase) GetMaxSeq(ctx context.Context, conversationID strin return db.seqConversation.GetMaxSeq(ctx, conversationID) } -// -//func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error { -// return db.seqConversation.SetMinSeq(ctx, conversationID, minSeq) -//} - func (db *commonMsgDatabase) SetMinSeqs(ctx context.Context, seqs map[string]int64) error { return db.seqConversation.SetMinSeqs(ctx, seqs) } @@ -847,11 +599,11 @@ func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, c } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return db.msg.SetSendMsgStatus(ctx, id, status) + return db.msgCache.SetSendMsgStatus(ctx, id, status) } func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - return db.msg.GetSendMsgStatus(ctx, id) + return db.msgCache.GetSendMsgStatus(ctx, id) } func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { @@ -888,26 +640,11 @@ func (db *commonMsgDatabase) GetMinMaxSeqMongo(ctx context.Context, conversation return } -func (db *commonMsgDatabase) RangeUserSendCount( - ctx context.Context, - start time.Time, - end time.Time, - group bool, - ase bool, - pageNumber int32, - showNumber int32, -) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { +func (db *commonMsgDatabase) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeUserSendCount(ctx, start, end, group, ase, pageNumber, showNumber) } -func (db *commonMsgDatabase) RangeGroupSendCount( - ctx context.Context, - start time.Time, - end time.Time, - ase bool, - pageNumber int32, - showNumber int32, -) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { +func (db *commonMsgDatabase) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) { return db.msgDocDatabase.RangeGroupSendCount(ctx, start, end, ase, pageNumber, showNumber) } @@ -947,43 +684,10 @@ func (db *commonMsgDatabase) FindOneByDocIDs(ctx context.Context, conversationID return totalMsgs, nil } -func (db *commonMsgDatabase) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { - db.msgDocDatabase.ConvertMsgsDocLen(ctx, conversationIDs) -} - func (db *commonMsgDatabase) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return db.msgDocDatabase.GetRandBeforeMsg(ctx, ts, limit) } -// -//func (db *commonMsgDatabase) DeleteDocMsgBefore(ctx context.Context, ts int64, doc *model.MsgDocModel) ([]int, error) { -// var notNull int -// index := make([]int, 0, len(doc.Msg)) -// for i, message := range doc.Msg { -// if message.Msg != nil { -// notNull++ -// if message.Msg.SendTime < ts { -// index = append(index, i) -// } -// } -// } -// if len(index) == 0 { -// return index, nil -// } -// maxSeq := doc.Msg[index[len(index)-1]].Msg.Seq -// conversationID := doc.DocID[:strings.LastIndex(doc.DocID, ":")] -// if err := db.SetMinSeq(ctx, conversationID, maxSeq+1); err != nil { -// return index, err -// } -// if len(index) == notNull { -// log.ZDebug(ctx, "Delete db in Doc", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) -// return index, db.msgDocDatabase.DeleteDoc(ctx, doc.DocID) -// } else { -// log.ZDebug(ctx, "delete db in index", "DocID", doc.DocID, "index", index, "maxSeq", maxSeq) -// return index, db.msgDocDatabase.DeleteMsgByIndex(ctx, doc.DocID, index) -// } -//} - func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID string, seq int64) error { dbSeq, err := db.seqConversation.GetMinSeq(ctx, conversationID) if err != nil { @@ -998,10 +702,6 @@ func (db *commonMsgDatabase) SetMinSeq(ctx context.Context, conversationID strin return db.seqConversation.SetMinSeq(ctx, conversationID, seq) } -func (db *commonMsgDatabase) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { - return db.msgDocDatabase.GetRandDocIDs(ctx, limit) -} - func (db *commonMsgDatabase) GetCacheMaxSeqWithTime(ctx context.Context, conversationIDs []string) (map[string]database.SeqTime, error) { return db.seqConversation.GetCacheMaxSeqWithTime(ctx, conversationIDs) } @@ -1016,9 +716,103 @@ func (db *commonMsgDatabase) GetMaxSeqsWithTime(ctx context.Context, conversatio } func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error { - return db.msgDocDatabase.DeleteDoc(ctx, docID) + index := strings.LastIndex(docID, ":") + if index <= 0 { + return errs.ErrInternalServer.WrapMsg("docID is invalid", "docID", docID) + } + index, err := strconv.Atoi(docID[index+1:]) + if err != nil { + return errs.WrapMsg(err, "strconv.Atoi", "docID", docID) + } + conversationID := docID[:index] + seqs := make([]int64, db.msgTable.GetSingleGocMsgNum()) + minSeq := db.msgTable.GetMinSeq(index) + for i := range seqs { + seqs[i] = minSeq + int64(i) + } + if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil { + return err + } + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { return db.msgDocDatabase.GetLastMessageSeqByTime(ctx, conversationID, time) } + +func (db *commonMsgDatabase) handlerDeleteAndRevoked(ctx context.Context, userID string, msgs []*model.MsgInfoModel) { + for i := range msgs { + msg := msgs[i] + if msg == nil || msg.Msg == nil { + continue + } + msg.Msg.IsRead = msg.IsRead + if datautil.Contain(userID, msg.DelList...) { + msg.Msg.Content = "" + msg.Msg.Status = constant.MsgDeleted + } + if msg.Revoke == nil { + continue + } + msg.Msg.ContentType = constant.MsgRevokeNotification + revokeContent := sdkws.MessageRevokedContent{ + RevokerID: msg.Revoke.UserID, + RevokerRole: msg.Revoke.Role, + ClientMsgID: msg.Msg.ClientMsgID, + RevokerNickname: msg.Revoke.Nickname, + RevokeTime: msg.Revoke.Time, + SourceMessageSendTime: msg.Msg.SendTime, + SourceMessageSendID: msg.Msg.SendID, + SourceMessageSenderNickname: msg.Msg.SenderNickname, + SessionType: msg.Msg.SessionType, + Seq: msg.Msg.Seq, + Ex: msg.Msg.Ex, + } + data, err := jsonutil.JsonMarshal(&revokeContent) + if err != nil { + log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal MessageRevokedContent", err, "msg", msg) + continue + } + elem := sdkws.NotificationElem{ + Detail: string(data), + } + content, err := jsonutil.JsonMarshal(&elem) + if err != nil { + log.ZWarn(ctx, "handlerDeleteAndRevoked JsonMarshal NotificationElem", err, "msg", msg) + continue + } + msg.Msg.Content = string(content) + } +} + +func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversationID string, msgs []*model.MsgInfoModel) { + temp := make(map[int64][]*model.MsgInfoModel) + for i := range msgs { + db.handlerDBMsg(ctx, temp, userID, conversationID, msgs[i]) + } +} + +func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) { + msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs) + if err != nil { + return nil, err + } + db.handlerDeleteAndRevoked(ctx, userID, msgs) + db.handlerQuote(ctx, userID, conversationID, msgs) + seqMsgs := make(map[int64]*model.MsgInfoModel) + for i, msg := range msgs { + if msg.Msg == nil { + continue + } + seqMsgs[msg.Msg.Seq] = msgs[i] + } + res := make([]*sdkws.MsgData, 0, len(seqs)) + for _, seq := range seqs { + if v, ok := seqMsgs[seq]; ok { + res = append(res, convert.MsgDB2Pb(v.Msg)) + } else { + res = append(res, &sdkws.MsgData{Seq: seq}) + } + } + return res, nil +} diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 1ecd786aa3..f4c0c6270f 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -2,10 +2,11 @@ package controller import ( "context" + "github.com/openimsdk/open-im-server/v3/pkg/common/convert" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -50,7 +51,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse } return &msgTransferDatabase{ msgDocDatabase: msgDocModel, - msg: msg, + msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, producerToMongo: producerToMongo, @@ -61,7 +62,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse type msgTransferDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel - msg cache.MsgCache + msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producerToMongo *kafka.Producer @@ -73,10 +74,12 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat return errs.ErrArgs.WrapMsg("msgList is empty") } msgs := make([]any, len(msgList)) + seqs := make([]int64, len(msgList)) for i, msg := range msgList { if msg == nil { continue } + seqs[i] = msg.Seq var offlinePushModel *model.OfflinePushModel if msg.OfflinePushInfo != nil { offlinePushModel = &model.OfflinePushModel{ @@ -114,7 +117,11 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat Ex: msg.Ex, } } - return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) + if err := db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq); err != nil { + return err + } + //return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) + return nil } func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { @@ -219,7 +226,7 @@ func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversatio } func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { @@ -238,20 +245,22 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq userSeqMap := make(map[string]int64) + seqs := make([]int64, 0, lenList) for _, m := range msgs { currentMaxSeq++ m.Seq = currentMaxSeq userSeqMap[m.SendID] = m.Seq + seqs = append(seqs, m.Seq) } - - failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) - if err != nil { - prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) - log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) - } else { - prommetrics.MsgInsertRedisSuccessCounter.Inc() + msgToDB := func(msg *sdkws.MsgData) *model.MsgInfoModel { + return &model.MsgInfoModel{ + Msg: convert.MsgPb2DB(msg), + } + } + if err := db.msgCache.SetMessageBySeqs(ctx, conversationID, datautil.Slice(msgs, msgToDB)); err != nil { + return 0, false, nil, err } - return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) + return lastMaxSeq, isNew, userSeqMap, nil } func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error { diff --git a/pkg/common/storage/database/mgo/msg.go b/pkg/common/storage/database/mgo/msg.go index f371766958..03ebff6110 100644 --- a/pkg/common/storage/database/mgo/msg.go +++ b/pkg/common/storage/database/mgo/msg.go @@ -7,15 +7,12 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/tools/utils/datautil" - "golang.org/x/exp/rand" - "github.com/openimsdk/protocol/constant" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/errs" - "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" @@ -42,12 +39,6 @@ type MsgMgo struct { model model.MsgDocModel } -func (m *MsgMgo) PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error { - filter := bson.M{"doc_id": docID} - update := bson.M{"$push": bson.M{"msgs": bson.M{"$each": msgsToMongo}}} - return mongoutil.UpdateOne(ctx, m.coll, filter, update, false) -} - func (m *MsgMgo) Create(ctx context.Context, msg *model.MsgDocModel) error { return mongoutil.InsertMany(ctx, m.coll, []*model.MsgDocModel{msg}) } @@ -80,16 +71,6 @@ func (m *MsgMgo) PushUnique(ctx context.Context, docID string, index int64, key return mongoutil.UpdateOneResult(ctx, m.coll, filter, update) } -func (m *MsgMgo) UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error { - filter := bson.M{"doc_id": docID} - update := bson.M{"$set": bson.M{fmt.Sprintf("msgs.%d.msg", index): msg}} - return mongoutil.UpdateOne(ctx, m.coll, filter, update, false) -} - -func (m *MsgMgo) IsExistDocID(ctx context.Context, docID string) (bool, error) { - return mongoutil.Exist(ctx, m.coll, bson.M{"doc_id": docID}) -} - func (m *MsgMgo) FindOneByDocID(ctx context.Context, docID string) (*model.MsgDocModel, error) { return mongoutil.FindOne[*model.MsgDocModel](ctx, m.coll, bson.M{"doc_id": docID}) } @@ -218,13 +199,6 @@ func (m *MsgMgo) GetOldestMsg(ctx context.Context, conversationID string) (*mode } } -func (m *MsgMgo) DeleteDocs(ctx context.Context, docIDs []string) error { - if len(docIDs) == 0 { - return nil - } - return mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": bson.M{"$in": docIDs}}) -} - func (m *MsgMgo) GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error) { if sort != 1 && sort != -1 { return nil, errs.ErrArgs.WrapMsg("mongo sort must be 1 or -1") @@ -279,95 +253,6 @@ func (m *MsgMgo) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, do return nil } -//func (m *MsgMgo) searchCount(ctx context.Context, filter any) (int64, error) { -// -// return nil, nil -//} - -//func (m *MsgMgo) searchMessage(ctx context.Context, filter any, nextID primitive.ObjectID, content bool, limit int) (int64, []*model.MsgInfoModel, primitive.ObjectID, error) { -// var pipeline bson.A -// if !nextID.IsZero() { -// pipeline = append(pipeline, bson.M{"$match": bson.M{"_id": bson.M{"$gt": nextID}}}) -// } -// pipeline = append(pipeline, -// bson.M{"$match": filter}, -// bson.M{"$limit": limit}, -// bson.M{"$unwind": "$msgs"}, -// bson.M{"$match": filter}, -// bson.M{ -// "$group": bson.M{ -// "_id": "$_id", -// "doc_id": bson.M{ -// "$first": "$doc_id", -// }, -// "msgs": bson.M{"$push": "$msgs"}, -// }, -// }, -// ) -// if !content { -// pipeline = append(pipeline, -// bson.M{ -// "$project": bson.M{ -// "_id": 1, -// "count": bson.M{"$size": "$msgs"}, -// }, -// }, -// ) -// type result struct { -// ID primitive.ObjectID `bson:"_id"` -// Count int64 `bson:"count"` -// } -// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) -// if err != nil { -// return 0, nil, primitive.ObjectID{}, err -// } -// if len(res) == 0 { -// return 0, nil, primitive.ObjectID{}, nil -// } -// var count int64 -// for _, r := range res { -// count += r.Count -// } -// return count, nil, res[len(res)-1].ID, nil -// } -// type result struct { -// ID primitive.ObjectID `bson:"_id"` -// Msg []*model.MsgInfoModel `bson:"msgs"` -// } -// res, err := mongoutil.Aggregate[result](ctx, m.coll, pipeline) -// if err != nil { -// return 0, nil, primitive.ObjectID{}, err -// } -// if len(res) == 0 { -// return 0, nil, primitive.ObjectID{}, err -// } -// var count int -// for _, r := range res { -// count += len(r.Msg) -// } -// msgs := make([]*model.MsgInfoModel, 0, count) -// for _, r := range res { -// msgs = append(msgs, r.Msg...) -// } -// return int64(count), msgs, res[len(res)-1].ID, nil -//} - -/* - -db.msg3.aggregate( - [ - { - "$match": { - "doc_id": "si_7009965934_8710838466:0" - }, - - } - ] -) - - -*/ - type searchMessageIndex struct { ID primitive.ObjectID `bson:"_id"` Index []int64 `bson:"index"` @@ -512,22 +397,6 @@ func (m *MsgMgo) searchMessage(ctx context.Context, req *msg.SearchMessageReq) ( } } -func (m *MsgMgo) getDocRange(ctx context.Context, id primitive.ObjectID, index []int64) ([]*model.MsgInfoModel, error) { - if len(index) == 0 { - return nil, nil - } - - pipeline := bson.A{ - bson.M{"$match": bson.M{"_id": id}}, - bson.M{"$project": "$msgs"}, - } - msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline) - if err != nil { - return nil, err - } - return msgs, nil -} - func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) { count, data, err := m.searchMessage(ctx, req) if err != nil { @@ -556,143 +425,6 @@ func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) ( return count, msgs, nil } -//func (m *MsgMgo) SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int32, []*model.MsgInfoModel, error) { -// where := make(bson.A, 0, 6) -// if req.RecvID != "" { -// if req.SessionType == constant.ReadGroupChatType { -// where = append(where, bson.M{ -// "$or": bson.A{ -// bson.M{"doc_id": "^n_" + req.RecvID + ":"}, -// bson.M{"doc_id": "^sg_" + req.RecvID + ":"}, -// }, -// }) -// } else { -// where = append(where, bson.M{"msgs.msg.recv_id": req.RecvID}) -// } -// } -// if req.SendID != "" { -// where = append(where, bson.M{"msgs.msg.send_id": req.SendID}) -// } -// if req.ContentType != 0 { -// where = append(where, bson.M{"msgs.msg.content_type": req.ContentType}) -// } -// if req.SessionType != 0 { -// where = append(where, bson.M{"msgs.msg.session_type": req.SessionType}) -// } -// if req.SendTime != "" { -// sendTime, err := time.Parse(time.DateOnly, req.SendTime) -// if err != nil { -// return 0, nil, errs.ErrArgs.WrapMsg("invalid sendTime", "req", req.SendTime, "format", time.DateOnly, "cause", err.Error()) -// } -// where = append(where, -// bson.M{ -// "msgs.msg.send_time": bson.M{ -// "$gte": sendTime.UnixMilli(), -// }, -// }, -// bson.M{ -// "msgs.msg.send_time": bson.M{ -// "$lt": sendTime.Add(time.Hour * 24).UnixMilli(), -// }, -// }, -// ) -// } -// opt := options.Find().SetLimit(100) -// res, err := mongoutil.Find[model.MsgDocModel](ctx, m.coll, bson.M{"$and": where}, opt) -// if err != nil { -// return 0, nil, err -// } -// _ = res -// fmt.Println() -// -// return 0, nil, nil -// pipeline := bson.A{ -// bson.M{ -// "$unwind": "$msgs", -// }, -// } -// if len(where) > 0 { -// pipeline = append(pipeline, bson.M{ -// "$match": bson.M{"$and": where}, -// }) -// } -// pipeline = append(pipeline, -// bson.M{ -// "$project": bson.M{ -// "_id": 0, -// "msg": "$msgs.msg", -// }, -// }, -// bson.M{ -// "$count": "count", -// }, -// ) -// //count, err := mongoutil.Aggregate[int32](ctx, m.coll, pipeline) -// //if err != nil { -// // return 0, nil, err -// //} -// //if len(count) == 0 || count[0] == 0 { -// // return 0, nil, nil -// //} -// count := []int32{0} -// pipeline = pipeline[:len(pipeline)-1] -// pipeline = append(pipeline, -// bson.M{ -// "$skip": (req.Pagination.GetPageNumber() - 1) * req.Pagination.GetShowNumber(), -// }, -// bson.M{ -// "$limit": req.Pagination.GetShowNumber(), -// }, -// ) -// msgs, err := mongoutil.Aggregate[*model.MsgInfoModel](ctx, m.coll, pipeline) -// if err != nil { -// return 0, nil, err -// } -// for i := range msgs { -// msgInfo := msgs[i] -// if msgInfo == nil || msgInfo.Msg == nil { -// continue -// } -// if msgInfo.Revoke != nil { -// revokeContent := sdkws.MessageRevokedContent{ -// RevokerID: msgInfo.Revoke.UserID, -// RevokerRole: msgInfo.Revoke.Role, -// ClientMsgID: msgInfo.Msg.ClientMsgID, -// RevokerNickname: msgInfo.Revoke.Nickname, -// RevokeTime: msgInfo.Revoke.Time, -// SourceMessageSendTime: msgInfo.Msg.SendTime, -// SourceMessageSendID: msgInfo.Msg.SendID, -// SourceMessageSenderNickname: msgInfo.Msg.SenderNickname, -// SessionType: msgInfo.Msg.SessionType, -// Seq: msgInfo.Msg.Seq, -// Ex: msgInfo.Msg.Ex, -// } -// data, err := jsonutil.JsonMarshal(&revokeContent) -// if err != nil { -// return 0, nil, errs.WrapMsg(err, "json.Marshal revokeContent") -// } -// elem := sdkws.NotificationElem{Detail: string(data)} -// content, err := jsonutil.JsonMarshal(&elem) -// if err != nil { -// return 0, nil, errs.WrapMsg(err, "json.Marshal elem") -// } -// msgInfo.Msg.ContentType = constant.MsgRevokeNotification -// msgInfo.Msg.Content = string(content) -// } -// } -// //start := (req.Pagination.PageNumber - 1) * req.Pagination.ShowNumber -// //n := int32(len(msgs)) -// //if start >= n { -// // return n, []*relation.MsgInfoModel{}, nil -// //} -// //if start+req.Pagination.ShowNumber < n { -// // msgs = msgs[start : start+req.Pagination.ShowNumber] -// //} else { -// // msgs = msgs[start:] -// //} -// return count[0], msgs, nil -//} - func (m *MsgMgo) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) { var sort int if ase { @@ -1178,94 +910,6 @@ func (m *MsgMgo) RangeGroupSendCount(ctx context.Context, start time.Time, end t return result[0].MsgCount, result[0].UserCount, groups, dateCount, nil } -func (m *MsgMgo) ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) { - for _, conversationID := range conversationIDs { - regex := primitive.Regex{Pattern: fmt.Sprintf("^%s:", conversationID)} - msgDocs, err := mongoutil.Find[*model.MsgDocModel](ctx, m.coll, bson.M{"doc_id": regex}) - if err != nil { - log.ZError(ctx, "convertAll find msg doc failed", err, "conversationID", conversationID) - continue - } - if len(msgDocs) < 1 { - continue - } - log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(msgDocs)", len(msgDocs)) - if len(msgDocs[0].Msg) == int(m.model.GetSingleGocMsgNum5000()) { - if err := mongoutil.DeleteMany(ctx, m.coll, bson.M{"doc_id": regex}); err != nil { - log.ZError(ctx, "convertAll delete many failed", err, "conversationID", conversationID) - continue - } - var newMsgDocs []any - for _, msgDoc := range msgDocs { - if int64(len(msgDoc.Msg)) == m.model.GetSingleGocMsgNum() { - continue - } - var index int64 - for index < int64(len(msgDoc.Msg)) { - msg := msgDoc.Msg[index] - if msg != nil && msg.Msg != nil { - msgDocModel := model.MsgDocModel{DocID: m.model.GetDocID(conversationID, msg.Msg.Seq)} - end := index + m.model.GetSingleGocMsgNum() - if int(end) >= len(msgDoc.Msg) { - msgDocModel.Msg = msgDoc.Msg[index:] - } else { - msgDocModel.Msg = msgDoc.Msg[index:end] - } - newMsgDocs = append(newMsgDocs, msgDocModel) - index = end - } else { - break - } - } - } - if err = mongoutil.InsertMany(ctx, m.coll, newMsgDocs); err != nil { - log.ZError(ctx, "convertAll insert many failed", err, "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) - } else { - log.ZDebug(ctx, "msg doc convert", "conversationID", conversationID, "len(newMsgDocs)", len(newMsgDocs)) - } - } - } -} - -func (m *MsgMgo) GetRandDocIDs(ctx context.Context, limit int) ([]string, error) { - var skip int - var docIDs []string - var offset int - - count, err := m.coll.CountDocuments(ctx, bson.M{}) - if err != nil { - return nil, err - } - - if count < int64(limit) { - skip = 0 - } else { - rand.Seed(uint64(time.Now().UnixMilli())) - skip = rand.Intn(int(count / int64(limit))) - offset = skip * limit - } - log.ZDebug(ctx, "offset", "skip", skip, "offset", offset) - res, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ - { - "$project": bson.M{ - "doc_id": 1, - }, - }, - { - "$skip": offset, - }, - { - "$limit": limit, - }, - }) - - for _, doc := range res { - docIDs = append(docIDs, doc.DocID) - } - - return docIDs, errs.Wrap(err) -} - func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) { return mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, []bson.M{ { @@ -1297,18 +941,6 @@ func (m *MsgMgo) GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]* }) } -func (m *MsgMgo) DeleteMsgByIndex(ctx context.Context, docID string, index []int) error { - if len(index) == 0 { - return nil - } - model := &model.MsgInfoModel{DelList: []string{}} - set := make(map[string]any) - for i := range index { - set[fmt.Sprintf("msgs.%d", i)] = model - } - return mongoutil.UpdateOne(ctx, m.coll, bson.M{"doc_id": docID}, bson.M{"$set": set}, true) -} - func (m *MsgMgo) DeleteDoc(ctx context.Context, docID string) error { return mongoutil.DeleteOne(ctx, m.coll, bson.M{"doc_id": docID}) } @@ -1364,3 +996,55 @@ func (m *MsgMgo) GetLastMessageSeqByTime(ctx context.Context, conversationID str } return seq, nil } + +func (m *MsgMgo) onlyFindDocIndex(ctx context.Context, docID string, indexes []int64) ([]*model.MsgInfoModel, error) { + if len(indexes) == 0 { + return nil, nil + } + pipeline := mongo.Pipeline{ + bson.D{{Key: "$match", Value: bson.D{ + {Key: "doc_id", Value: docID}, + }}}, + bson.D{{Key: "$project", Value: bson.D{ + {Key: "_id", Value: 0}, + {Key: "doc_id", Value: 1}, + {Key: "msgs", Value: bson.D{ + {Key: "$map", Value: bson.D{ + {Key: "input", Value: indexes}, + {Key: "as", Value: "index"}, + {Key: "in", Value: bson.D{ + {Key: "$arrayElemAt", Value: bson.A{"$msgs", "$$index"}}, + }}, + }}, + }}, + }}}, + } + msgDocModel, err := mongoutil.Aggregate[*model.MsgDocModel](ctx, m.coll, pipeline) + if err != nil { + return nil, err + } + if len(msgDocModel) == 0 { + return nil, nil + } + return msgDocModel[0].Msg, nil +} + +func (m *MsgMgo) FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { + if len(seqs) == 0 { + return nil, nil + } + result := make([]*model.MsgInfoModel, 0, len(seqs)) + for docID, seqs := range m.model.GetDocIDSeqsMap(conversationID, seqs) { + res, err := m.onlyFindDocIndex(ctx, docID, datautil.Slice(seqs, m.model.GetMsgIndex)) + if err != nil { + return nil, err + } + for i, re := range res { + if re == nil || re.Msg == nil { + continue + } + result = append(result, res[i]) + } + } + return result, nil +} diff --git a/pkg/common/storage/database/msg.go b/pkg/common/storage/database/msg.go index abb2a44c2f..b44e702964 100644 --- a/pkg/common/storage/database/msg.go +++ b/pkg/common/storage/database/msg.go @@ -24,30 +24,20 @@ import ( ) type Msg interface { - PushMsgsToDoc(ctx context.Context, docID string, msgsToMongo []model.MsgInfoModel) error Create(ctx context.Context, model *model.MsgDocModel) error UpdateMsg(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) PushUnique(ctx context.Context, docID string, index int64, key string, value any) (*mongo.UpdateResult, error) - UpdateMsgContent(ctx context.Context, docID string, index int64, msg []byte) error - IsExistDocID(ctx context.Context, docID string) (bool, error) FindOneByDocID(ctx context.Context, docID string) (*model.MsgDocModel, error) GetMsgBySeqIndexIn1Doc(ctx context.Context, userID, docID string, seqs []int64) ([]*model.MsgInfoModel, error) GetNewestMsg(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) GetOldestMsg(ctx context.Context, conversationID string) (*model.MsgInfoModel, error) - DeleteDocs(ctx context.Context, docIDs []string) error - GetMsgDocModelByIndex(ctx context.Context, conversationID string, index, sort int64) (*model.MsgDocModel, error) DeleteMsgsInOneDocByIndex(ctx context.Context, docID string, indexes []int) error MarkSingleChatMsgsAsRead(ctx context.Context, userID string, docID string, indexes []int64) error SearchMessage(ctx context.Context, req *msg.SearchMessageReq) (int64, []*model.MsgInfoModel, error) RangeUserSendCount(ctx context.Context, start time.Time, end time.Time, group bool, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, users []*model.UserCount, dateCount map[string]int64, err error) RangeGroupSendCount(ctx context.Context, start time.Time, end time.Time, ase bool, pageNumber int32, showNumber int32) (msgCount int64, userCount int64, groups []*model.GroupCount, dateCount map[string]int64, err error) - ConvertMsgsDocLen(ctx context.Context, conversationIDs []string) - DeleteDoc(ctx context.Context, docID string) error - DeleteMsgByIndex(ctx context.Context, docID string, index []int) error GetRandBeforeMsg(ctx context.Context, ts int64, limit int) ([]*model.MsgDocModel, error) - - GetRandDocIDs(ctx context.Context, limit int) ([]string, error) - GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) + FindSeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) } diff --git a/pkg/common/storage/model/msg.go b/pkg/common/storage/model/msg.go index e16233973b..69113032da 100644 --- a/pkg/common/storage/model/msg.go +++ b/pkg/common/storage/model/msg.go @@ -143,3 +143,7 @@ func (*MsgDocModel) GenExceptionMessageBySeqs(seqs []int64) (exceptionMsg []*sdk } return exceptionMsg } + +func (*MsgDocModel) GetMinSeq(index int) int64 { + return int64(index*singleGocMsgNum) + 1 +} From 4c537321b61f6b2b095f8871a0dc34ef974b816a Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Thu, 26 Dec 2024 17:53:14 +0800 Subject: [PATCH 2/3] docs: improve deployment docs in kubernetes. (#2973) * docs: improve deployment docs in kubernetes. * move docs path. * format contents. * update contents. * build: update deployment env. * docs: update deploy docs. * build: add kafka secret and dependencies. * docs: update deployment docs. * Update docs contents. * update docs contents. --- deployments/Readme.md | 189 +++++++++++++++++- deployments/deploy/README.md | 85 -------- deployments/deploy/kafka-secret.yml | 7 + deployments/deploy/minio-secret.yml | 8 + deployments/deploy/minio-statefulset.yml | 14 +- deployments/deploy/mongo-secret.yml | 8 + deployments/deploy/mongo-statefulset.yml | 23 ++- deployments/deploy/openim-api-deployment.yml | 18 +- deployments/deploy/openim-config.yml | 20 +- .../deploy/openim-msggateway-deployment.yml | 2 +- .../deploy/openim-msgtransfer-deployment.yml | 14 +- deployments/deploy/openim-push-deployment.yml | 7 +- .../deploy/openim-rpc-auth-deployment.yml | 2 +- .../openim-rpc-conversation-deployment.yml | 9 +- .../deploy/openim-rpc-friend-deployment.yml | 13 +- .../deploy/openim-rpc-group-deployment.yml | 12 +- .../deploy/openim-rpc-msg-deployment.yml | 18 +- .../deploy/openim-rpc-third-deployment.yml | 15 +- .../deploy/openim-rpc-user-deployment.yml | 14 +- deployments/deploy/redis-secret.yml | 7 + deployments/deploy/redis-statefulset.yml | 11 - 21 files changed, 329 insertions(+), 167 deletions(-) delete mode 100644 deployments/deploy/README.md create mode 100644 deployments/deploy/kafka-secret.yml create mode 100644 deployments/deploy/minio-secret.yml create mode 100644 deployments/deploy/mongo-secret.yml create mode 100644 deployments/deploy/redis-secret.yml diff --git a/deployments/Readme.md b/deployments/Readme.md index 775baaea0d..8da4f90aa1 100644 --- a/deployments/Readme.md +++ b/deployments/Readme.md @@ -1,3 +1,188 @@ -# OpenIM Application Containerization Deployment Guide +# Kubernetes Deployment -view deploy [README](./deploy/README.md) \ No newline at end of file +## Resource Requests + +- CPU: 2 cores +- Memory: 4 GiB +- Disk usage: 20 GiB (on Node) + +## Preconditions + +ensure that you have already deployed the following components: + +- Redis +- MongoDB +- Kafka +- MinIO + +## Origin Deploy + +### Enter the target dir + +`cd ./deployments/deploy/` + +### Deploy configs and dependencies + +Upate your configMap `openim-config.yml`. **You can check the official docs for more details.** + +In `openim-config.yml`, you need modify the following configurations: + +**discovery.yml** + +- `kubernetes.namespace`: default is `default`, you can change it to your namespace. + +**mongodb.yml** + +- `address`: set to your already mongodb address or mongo Service name and port in your deployed. +- `database`: set to your mongodb database name.(Need have a created database.) +- `authSource`: set to your mongodb authSource. (authSource is specify the database name associated with the user's credentials, user need create in this database.) + +**kafka.yml** + +- `address`: set to your already kafka address or kafka Service name and port in your deployed. + +**redis.yml** + +- `address`: set to your already redis address or redis Service name and port in your deployed. + +**minio.yml** + +- `internalAddress`: set to your minio Service name and port in your deployed. +- `externalAddress`: set to your already expose minio external address. + +### Set the secret + +A Secret is an object that contains a small amount of sensitive data. Such as password and secret. Secret is similar to ConfigMaps. + +#### Redis: + +Update the `redis-password` value in `redis-secret.yml` to your Redis password encoded in base64. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: openim-redis-secret +type: Opaque +data: + redis-password: b3BlbklNMTIz # update to your redis password encoded in base64, if need empty, you can set to "" +``` + +#### Mongo: + +Update the `mongo_openim_username`, `mongo_openim_password` value in `mongo-secret.yml` to your Mongo username and password encoded in base64. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: openim-mongo-secret +type: Opaque +data: + mongo_openim_username: b3BlbklN # update to your mongo username encoded in base64, if need empty, you can set to "" (this user credentials need in authSource database). + mongo_openim_password: b3BlbklNMTIz # update to your mongo password encoded in base64, if need empty, you can set to "" +``` + +#### Minio: + +Update the `minio-root-user` and `minio-root-password` value in `minio-secret.yml` to your MinIO accessKeyID and secretAccessKey encoded in base64. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: openim-minio-secret +type: Opaque +data: + minio-root-user: cm9vdA== # update to your minio accessKeyID encoded in base64, if need empty, you can set to "" + minio-root-password: b3BlbklNMTIz # update to your minio secretAccessKey encoded in base64, if need empty, you can set to "" +``` + +#### Kafka: + +Update the `kafka-password` value in `kafka-secret.yml` to your Kafka password encoded in base64. + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: openim-kafka-secret +type: Opaque +data: + kafka-password: b3BlbklNMTIz # update to your kafka password encoded in base64, if need empty, you can set to "" +``` + +### Apply the secret. + +```shell +kubectl apply -f redis-secret.yml -f minio-secret.yml -f mongo-secret.yml -f kafka-secret.yml +``` + +### Apply all config + +`kubectl apply -f ./openim-config.yml` + +> Attation: If you use `default` namespace, you can excute `clusterRile.yml` to create a cluster role binding for default service account. +> +> Namespace is modify to `discovery.yml` in `openim-config.yml`, you can change `kubernetes.namespace` to your namespace. + +**Excute `clusterRole.yml`** + +`kubectl apply -f ./clusterRole.yml` + +### run all deployments and services + +> Note: Ensure that infrastructure services like MinIO, Redis, and Kafka are running before deploying the main applications. + +```bash +kubectl apply \ + -f openim-api-deployment.yml \ + -f openim-api-service.yml \ + -f openim-crontask-deployment.yml \ + -f openim-rpc-user-deployment.yml \ + -f openim-rpc-user-service.yml \ + -f openim-msggateway-deployment.yml \ + -f openim-msggateway-service.yml \ + -f openim-push-deployment.yml \ + -f openim-push-service.yml \ + -f openim-msgtransfer-service.yml \ + -f openim-msgtransfer-deployment.yml \ + -f openim-rpc-conversation-deployment.yml \ + -f openim-rpc-conversation-service.yml \ + -f openim-rpc-auth-deployment.yml \ + -f openim-rpc-auth-service.yml \ + -f openim-rpc-group-deployment.yml \ + -f openim-rpc-group-service.yml \ + -f openim-rpc-friend-deployment.yml \ + -f openim-rpc-friend-service.yml \ + -f openim-rpc-msg-deployment.yml \ + -f openim-rpc-msg-service.yml \ + -f openim-rpc-third-deployment.yml \ + -f openim-rpc-third-service.yml +``` + +### Verification + +After deploying the services, verify that everything is running smoothly: + +```bash +# Check the status of all pods +kubectl get pods + +# Check the status of services +kubectl get svc + +# Check the status of deployments +kubectl get deployments + +# View all resources +kubectl get all +``` + +### clean all + +`kubectl delete -f ./` + +### Notes: + +- If you use a specific namespace for your deployment, be sure to append the -n flag to your kubectl commands. diff --git a/deployments/deploy/README.md b/deployments/deploy/README.md deleted file mode 100644 index d6b083bc55..0000000000 --- a/deployments/deploy/README.md +++ /dev/null @@ -1,85 +0,0 @@ -# Kubernetes Deployment - -## Resource Requests - -- CPU: 2 cores -- Memory: 4 GiB -- Disk usage: 20 GiB (on Node) - -## Origin Deploy - -1. Enter the target dir - `cd ./deployments/deploy/` - -2. Deploy configs and dependencies - Upate your `openim-config.yml` - -Apply all config and dependencies -`kubectl apply -f ./openim-config.yml` - -> Attation: If you use `default` namespace, you can excute `clusterRile.yml` to create a cluster role binding for default service account. -> -> Namespace is modify to `discovery.yml` in `openim-config.yml`, you can change `kubernetes.namespace` to your namespace. - -Excute `clusterRole.yml` -`kubectl apply -f ./clusterRole.yml` - -Run infrasturcture components. - -`kubectl apply -f minio-service.yml -f minio-statefulset.yml -f mongo-service.yml -f mongo-statefulset.yml -f redis-service.yml -f redis-statefulset.yml -f kafka-service.yml -f kafka-statefulset.yml` - -> Note: Ensure that infrastructure services like MinIO, Redis, and Kafka are running before deploying the main applications. - -3. run all deployments and services - -```bash -kubectl apply \ - -f openim-api-deployment.yml \ - -f openim-api-service.yml \ - -f openim-crontask-deployment.yml \ - -f openim-rpc-user-deployment.yml \ - -f openim-rpc-user-service.yml \ - -f openim-msggateway-deployment.yml \ - -f openim-msggateway-service.yml \ - -f openim-push-deployment.yml \ - -f openim-push-service.yml \ - -f openim-msgtransfer-service.yml \ - -f openim-msgtransfer-deployment.yml \ - -f openim-rpc-conversation-deployment.yml \ - -f openim-rpc-conversation-service.yml \ - -f openim-rpc-auth-deployment.yml \ - -f openim-rpc-auth-service.yml \ - -f openim-rpc-group-deployment.yml \ - -f openim-rpc-group-service.yml \ - -f openim-rpc-friend-deployment.yml \ - -f openim-rpc-friend-service.yml \ - -f openim-rpc-msg-deployment.yml \ - -f openim-rpc-msg-service.yml \ - -f openim-rpc-third-deployment.yml \ - -f openim-rpc-third-service.yml -``` - -4. Verification - After deploying the services, verify that everything is running smoothly: - -```bash -# Check the status of all pods -kubectl get pods - -# Check the status of services -kubectl get svc - -# Check the status of deployments -kubectl get deployments - -# View all resources -kubectl get all -``` - -5. clean all - -`kubectl delete -f ./` - -### Notes: - -- If you use a specific namespace for your deployment, be sure to append the -n flag to your kubectl commands. diff --git a/deployments/deploy/kafka-secret.yml b/deployments/deploy/kafka-secret.yml new file mode 100644 index 0000000000..dcee689c86 --- /dev/null +++ b/deployments/deploy/kafka-secret.yml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Secret +metadata: + name: openim-kafka-secret +type: Opaque +data: + kafka-password: "" diff --git a/deployments/deploy/minio-secret.yml b/deployments/deploy/minio-secret.yml new file mode 100644 index 0000000000..3ea09a19f4 --- /dev/null +++ b/deployments/deploy/minio-secret.yml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: openim-minio-secret +type: Opaque +data: + minio-root-user: cm9vdA== # Base64 encoded "root" + minio-root-password: b3BlbklNMTIz # Base64 encoded "openIM123" diff --git a/deployments/deploy/minio-statefulset.yml b/deployments/deploy/minio-statefulset.yml index c8806ff12a..9cf0a42d03 100644 --- a/deployments/deploy/minio-statefulset.yml +++ b/deployments/deploy/minio-statefulset.yml @@ -31,12 +31,12 @@ spec: - name: MINIO_ROOT_USER valueFrom: secretKeyRef: - name: minio-secret + name: openim-minio-secret key: minio-root-user - name: MINIO_ROOT_PASSWORD valueFrom: secretKeyRef: - name: minio-secret + name: openim-minio-secret key: minio-root-password command: - "/bin/sh" @@ -76,12 +76,4 @@ spec: requests: storage: 2Gi ---- -apiVersion: v1 -kind: Secret -metadata: - name: minio-secret -type: Opaque -data: - minio-root-user: cm9vdA== # Base64 encoded "root" - minio-root-password: b3BlbklNMTIz # Base64 encoded "openIM123" + diff --git a/deployments/deploy/mongo-secret.yml b/deployments/deploy/mongo-secret.yml new file mode 100644 index 0000000000..c3c10af243 --- /dev/null +++ b/deployments/deploy/mongo-secret.yml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: openim-mongo-secret +type: Opaque +data: + mongo_openim_username: b3BlbklN # base64 for "openIM", this user credentials need in authSource database. + mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123" diff --git a/deployments/deploy/mongo-statefulset.yml b/deployments/deploy/mongo-statefulset.yml index e8510fdf7b..41cd4cb7f8 100644 --- a/deployments/deploy/mongo-statefulset.yml +++ b/deployments/deploy/mongo-statefulset.yml @@ -47,27 +47,27 @@ spec: - name: MONGO_INITDB_ROOT_USERNAME valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-init-secret key: mongo_initdb_root_username - name: MONGO_INITDB_ROOT_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-init-secret key: mongo_initdb_root_password - name: MONGO_INITDB_DATABASE valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-init-secret key: mongo_initdb_database - name: MONGO_OPENIM_USERNAME valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-init-secret key: mongo_openim_username - name: MONGO_OPENIM_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-init-secret key: mongo_openim_password - name: TZ value: "Asia/Shanghai" @@ -93,3 +93,16 @@ spec: resources: requests: storage: 5Gi + +--- +apiVersion: v1 +kind: Secret +metadata: + name: openim-mongo-init-secret +type: Opaque +data: + mongo_initdb_root_username: cm9vdA== # base64 for "root" + mongo_initdb_root_password: b3BlbklNMTIz # base64 for "openIM123" + mongo_initdb_database: b3BlbmltX3Yz # base64 for "openim_v3" + mongo_openim_username: b3BlbklN # base64 for "openIM" + mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123" diff --git a/deployments/deploy/openim-api-deployment.yml b/deployments/deploy/openim-api-deployment.yml index cb1a68075d..d2d27dc0c8 100644 --- a/deployments/deploy/openim-api-deployment.yml +++ b/deployments/deploy/openim-api-deployment.yml @@ -21,20 +21,18 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password - - - name: IMENV_MONGODB_PASSWORD - valueFrom: - secretKeyRef: - name: mongo-secret - key: mongo_openim_password - - name: IMENV_MONGODB_USERNAME valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_username + - name: IMENV_MONGODB_PASSWORD + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_password volumeMounts: - name: openim-config @@ -46,4 +44,4 @@ spec: volumes: - name: openim-config configMap: - name: openim-config \ No newline at end of file + name: openim-config diff --git a/deployments/deploy/openim-config.yml b/deployments/deploy/openim-config.yml index d0651bdeaa..105dd98e35 100644 --- a/deployments/deploy/openim-config.yml +++ b/deployments/deploy/openim-config.yml @@ -4,7 +4,7 @@ metadata: name: openim-config data: discovery.yml: | - enable: "kubernetes" + enable: "kubernetes" # "kubernetes" or "etcd" kubernetes: namespace: default etcd: @@ -26,7 +26,6 @@ data: log.yml: | # Log storage path, default is acceptable, change to a full path if modification is needed - # storageLocation: ../../../../logs/ storageLocation: ./logs/ # Log rotation period (in hours), default is acceptable rotationTime: 24 @@ -49,9 +48,9 @@ data: # Name of the database database: openim_v3 # Username for database authentication - username: openIM + username: '' # openIM # Password for database authentication - password: openIM123 + password: '' # openIM123 # Authentication source for database authentication, if use root user, set it to admin authSource: openim_v3 # Maximum number of connections in the connection pool @@ -1055,16 +1054,3 @@ data: - targets: [ internal_ip:12320 ] labels: namespace: default - ---- -apiVersion: v1 -kind: Secret -metadata: - name: mongo-secret -type: Opaque -data: - mongo_initdb_root_username: cm9vdA== # base64 for "root" - mongo_initdb_root_password: b3BlbklNMTIz # base64 for "openIM123" - mongo_initdb_database: b3BlbmltX3Yz # base64 for "openim_v3" - mongo_openim_username: b3BlbklN # base64 for "openIM" - mongo_openim_password: b3BlbklNMTIz # base64 for "openIM123" \ No newline at end of file diff --git a/deployments/deploy/openim-msggateway-deployment.yml b/deployments/deploy/openim-msggateway-deployment.yml index ba2b1b84ea..b1a142e23b 100644 --- a/deployments/deploy/openim-msggateway-deployment.yml +++ b/deployments/deploy/openim-msggateway-deployment.yml @@ -21,7 +21,7 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password volumeMounts: - name: openim-config diff --git a/deployments/deploy/openim-msgtransfer-deployment.yml b/deployments/deploy/openim-msgtransfer-deployment.yml index be608845ca..323ed56604 100644 --- a/deployments/deploy/openim-msgtransfer-deployment.yml +++ b/deployments/deploy/openim-msgtransfer-deployment.yml @@ -21,13 +21,23 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password + - name: IMENV_KAFKA_PASSWORD + valueFrom: + secretKeyRef: + name: openim-kafka-secret + key: kafka-password volumeMounts: - name: openim-config mountPath: "/config" diff --git a/deployments/deploy/openim-push-deployment.yml b/deployments/deploy/openim-push-deployment.yml index 2092b343c6..bb36170e9a 100644 --- a/deployments/deploy/openim-push-deployment.yml +++ b/deployments/deploy/openim-push-deployment.yml @@ -21,8 +21,13 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_KAFKA_PASSWORD + valueFrom: + secretKeyRef: + name: openim-kafka-secret + key: kafka-password volumeMounts: - name: openim-config mountPath: "/config" diff --git a/deployments/deploy/openim-rpc-auth-deployment.yml b/deployments/deploy/openim-rpc-auth-deployment.yml index b785ea92f6..a15c901f5f 100644 --- a/deployments/deploy/openim-rpc-auth-deployment.yml +++ b/deployments/deploy/openim-rpc-auth-deployment.yml @@ -22,7 +22,7 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password volumeMounts: - name: openim-config diff --git a/deployments/deploy/openim-rpc-conversation-deployment.yml b/deployments/deploy/openim-rpc-conversation-deployment.yml index 4d7a32497f..2c9bde3377 100644 --- a/deployments/deploy/openim-rpc-conversation-deployment.yml +++ b/deployments/deploy/openim-rpc-conversation-deployment.yml @@ -21,12 +21,17 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password volumeMounts: - name: openim-config diff --git a/deployments/deploy/openim-rpc-friend-deployment.yml b/deployments/deploy/openim-rpc-friend-deployment.yml index 5fdd3bf62b..e01238888f 100644 --- a/deployments/deploy/openim-rpc-friend-deployment.yml +++ b/deployments/deploy/openim-rpc-friend-deployment.yml @@ -14,19 +14,24 @@ spec: spec: containers: - name: friend-rpc-server-container - image: openim/openim-rpc-friend:v3.8.3 + image: openim/openim-rpc-friend:v3.8.3 env: - name: CONFIG_PATH value: "/config" - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password volumeMounts: - name: openim-config @@ -38,4 +43,4 @@ spec: volumes: - name: openim-config configMap: - name: openim-config \ No newline at end of file + name: openim-config diff --git a/deployments/deploy/openim-rpc-group-deployment.yml b/deployments/deploy/openim-rpc-group-deployment.yml index 313fec897e..4698d60b56 100644 --- a/deployments/deploy/openim-rpc-group-deployment.yml +++ b/deployments/deploy/openim-rpc-group-deployment.yml @@ -15,19 +15,23 @@ spec: containers: - name: group-rpc-server-container image: openim/openim-rpc-group:v3.8.3 - env: - name: CONFIG_PATH value: "/config" - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password volumeMounts: - name: openim-config @@ -39,4 +43,4 @@ spec: volumes: - name: openim-config configMap: - name: openim-config \ No newline at end of file + name: openim-config diff --git a/deployments/deploy/openim-rpc-msg-deployment.yml b/deployments/deploy/openim-rpc-msg-deployment.yml index e883f5849f..26a8333427 100644 --- a/deployments/deploy/openim-rpc-msg-deployment.yml +++ b/deployments/deploy/openim-rpc-msg-deployment.yml @@ -14,20 +14,30 @@ spec: spec: containers: - name: msg-rpc-server-container - image: openim/openim-rpc-msg:v3.8.3 + image: openim/openim-rpc-msg:v3.8.3 env: - name: CONFIG_PATH value: "/config" - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password + - name: IMENV_KAFKA_PASSWORD + valueFrom: + secretKeyRef: + name: openim-kafka-secret + key: kafka-password volumeMounts: - name: openim-config mountPath: "/config" @@ -38,4 +48,4 @@ spec: volumes: - name: openim-config configMap: - name: openim-config \ No newline at end of file + name: openim-config diff --git a/deployments/deploy/openim-rpc-third-deployment.yml b/deployments/deploy/openim-rpc-third-deployment.yml index 326aaee035..f6919f510b 100644 --- a/deployments/deploy/openim-rpc-third-deployment.yml +++ b/deployments/deploy/openim-rpc-third-deployment.yml @@ -14,29 +14,34 @@ spec: spec: containers: - name: third-rpc-server-container - image: openim/openim-rpc-third:v3.8.3 + image: openim/openim-rpc-third:v3.8.3 env: - name: CONFIG_PATH value: "/config" - name: IMENV_MINIO_ACCESSKEYID valueFrom: secretKeyRef: - name: minio-secret + name: openim-minio-secret key: minio-root-user - name: IMENV_MINIO_SECRETACCESSKEY valueFrom: secretKeyRef: - name: minio-secret + name: openim-minio-secret key: minio-root-password - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password volumeMounts: - name: openim-config diff --git a/deployments/deploy/openim-rpc-user-deployment.yml b/deployments/deploy/openim-rpc-user-deployment.yml index c6a47e1257..c3e36d1be7 100644 --- a/deployments/deploy/openim-rpc-user-deployment.yml +++ b/deployments/deploy/openim-rpc-user-deployment.yml @@ -21,13 +21,23 @@ spec: - name: IMENV_REDIS_PASSWORD valueFrom: secretKeyRef: - name: redis-secret + name: openim-redis-secret key: redis-password + - name: IMENV_MONGODB_USERNAME + valueFrom: + secretKeyRef: + name: openim-mongo-secret + key: mongo_openim_username - name: IMENV_MONGODB_PASSWORD valueFrom: secretKeyRef: - name: mongo-secret + name: openim-mongo-secret key: mongo_openim_password + - name: IMENV_KAFKA_PASSWORD + valueFrom: + secretKeyRef: + name: openim-kafka-secret + key: kafka-password volumeMounts: - name: openim-config mountPath: "/config" diff --git a/deployments/deploy/redis-secret.yml b/deployments/deploy/redis-secret.yml new file mode 100644 index 0000000000..463ec95450 --- /dev/null +++ b/deployments/deploy/redis-secret.yml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Secret +metadata: + name: openim-redis-secret +type: Opaque +data: + redis-password: b3BlbklNMTIz # "openIM123" in base64 diff --git a/deployments/deploy/redis-statefulset.yml b/deployments/deploy/redis-statefulset.yml index 3f6dc41bcf..5668b20cc9 100644 --- a/deployments/deploy/redis-statefulset.yml +++ b/deployments/deploy/redis-statefulset.yml @@ -29,9 +29,6 @@ spec: volumeMounts: - name: redis-data mountPath: /data - # - name: redis-config-volume - # mountPath: /usr/local/redis/config/redis.conf - # subPath: redis.conf command: [ "/bin/sh", @@ -56,11 +53,3 @@ spec: resources: requests: storage: 5Gi ---- -apiVersion: v1 -kind: Secret -metadata: - name: redis-secret -type: Opaque -data: - redis-password: b3BlbklNMTIz # "openIM123" in base64 From dec423eeb31ad789d92e87390ea7e3be9a8c04a3 Mon Sep 17 00:00:00 2001 From: OpenIM-Gordon <1432970085@qq.com> Date: Thu, 26 Dec 2024 17:53:54 +0800 Subject: [PATCH 3/3] fix: The message @ information will be set only for members in the group. (#3009) --- go.mod | 2 +- go.sum | 4 ++-- internal/rpc/msg/send.go | 20 +++++++++++++------- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 05fe6db15e..3cbbff3c9f 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.68 - github.com/openimsdk/tools v0.0.50-alpha.61 + github.com/openimsdk/tools v0.0.50-alpha.62 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index 1fc3c33db3..846ef721de 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/openimsdk/gomake v0.0.15-alpha.2 h1:5Q8yl8ezy2yx+q8/ucU/t4kJnDfCzNOrk github.com/openimsdk/gomake v0.0.15-alpha.2/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.68 h1:Ekn6S9Ftt12Xs/p9kJ39RDr2gSwIczz+MmSHQE4lAek= github.com/openimsdk/protocol v0.0.72-alpha.68/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.61 h1:zKEZwrj+fUVuyC6KR3kZp9zFaCCIFgoSbHO0r0mZ6h4= -github.com/openimsdk/tools v0.0.50-alpha.61/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U= +github.com/openimsdk/tools v0.0.50-alpha.62 h1:e/m1XL7+EXbkOoxr/En/612WcOPKOUHPBj0++gG6MuQ= +github.com/openimsdk/tools v0.0.50-alpha.62/go.mod h1:JowL2jYr8tu4vcQe+5hJh4v3BtSx1T0CIS3pgU/Mw+U= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/rpc/msg/send.go b/internal/rpc/msg/send.go index f01134f8f0..19f4e9ffd1 100644 --- a/internal/rpc/msg/send.go +++ b/internal/rpc/msg/send.go @@ -101,14 +101,14 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa ConversationType: msg.SessionType, GroupID: msg.GroupID, } + memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID) + if err != nil { + log.ZWarn(ctx, "GetGroupMemberIDs", err) + return + } tagAll := datautil.Contain(constant.AtAllString, msg.AtUserIDList...) if tagAll { - memberUserIDList, err := m.GroupLocalCache.GetGroupMemberIDs(ctx, msg.GroupID) - if err != nil { - log.ZWarn(ctx, "GetGroupMemberIDs", err) - return - } memberUserIDList = datautil.DeleteElems(memberUserIDList, msg.SendID) @@ -118,6 +118,9 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAll} } else { // @Everyone and @other people conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtAllAtMe} + atUserID = datautil.SliceIntersectFuncs(atUserID, memberUserIDList, func(a string) string { return a }, func(b string) string { + return b + }) if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil { log.ZWarn(ctx, "SetConversations", err, "userID", atUserID, "conversation", conversation) } @@ -131,10 +134,13 @@ func (m *msgServer) setConversationAtInfo(nctx context.Context, msg *sdkws.MsgDa return } + atUserID = datautil.SliceIntersectFuncs(msg.AtUserIDList, memberUserIDList, func(a string) string { return a }, func(b string) string { + return b + }) conversation.GroupAtType = &wrapperspb.Int32Value{Value: constant.AtMe} - if err := m.conversationClient.SetConversations(ctx, msg.AtUserIDList, conversation); err != nil { - log.ZWarn(ctx, "SetConversations", err, msg.AtUserIDList, conversation) + if err := m.conversationClient.SetConversations(ctx, atUserID, conversation); err != nil { + log.ZWarn(ctx, "SetConversations", err, atUserID, conversation) } }