diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index ee9f066448..13721d61b0 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -92,11 +92,11 @@ func Start(ctx context.Context, index int, config *Config) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { return err } + msgModel := redis.NewMsgCache(rdb, msgDocModel) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) if err != nil { return err diff --git a/internal/rpc/msg/server.go b/internal/rpc/msg/server.go index 991fb87044..7ccda6bd57 100644 --- a/internal/rpc/msg/server.go +++ b/internal/rpc/msg/server.go @@ -89,7 +89,7 @@ func Start(ctx context.Context, config *Config, client discovery.SvcDiscoveryReg if err != nil { return err } - msgModel := redis.NewMsgCache(rdb) + msgModel := redis.NewMsgCache(rdb, msgDocModel) seqConversation, err := mgo.NewSeqConversationMongo(mgocli.GetDB()) if err != nil { return err diff --git a/pkg/common/storage/cache/cachekey/msg.go b/pkg/common/storage/cache/cachekey/msg.go index 4c8b32cd05..ac449df38e 100644 --- a/pkg/common/storage/cache/cachekey/msg.go +++ b/pkg/common/storage/cache/cachekey/msg.go @@ -19,17 +19,12 @@ import ( ) const ( - messageCache = "MESSAGE_CACHE:" sendMsgFailedFlag = "SEND_MSG_FAILED_FLAG:" - messageCacheV2 = "MESSAGE_CACHE_V2:" + messageCache = "MSG_CACHE:" ) -func GetMessageCacheKey(conversationID string, seq int64) string { - return messageCache + conversationID + "_" + strconv.Itoa(int(seq)) -} - -func GetMessageCacheKeyV2(conversationID string, seq int64) string { - return messageCacheV2 + conversationID + "_" + strconv.Itoa(int(seq)) +func GetMsgCacheKey(conversationID string, seq int64) string { + return messageCache + conversationID + ":" + strconv.Itoa(int(seq)) } func GetSendMsgKey(id string) string { diff --git a/pkg/common/storage/cache/msg.go b/pkg/common/storage/cache/msg.go index 084fc63b56..9a04f3cf82 100644 --- a/pkg/common/storage/cache/msg.go +++ b/pkg/common/storage/cache/msg.go @@ -17,13 +17,9 @@ package cache import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/protocol/sdkws" ) type MsgCache interface { - GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsg []*sdkws.MsgData, failedSeqList []int64, err error) - SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) - DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error SetSendMsgStatus(ctx context.Context, id string, status int32) error GetSendMsgStatus(ctx context.Context, id string) (int32, error) diff --git a/pkg/common/storage/cache/redis/msg.go b/pkg/common/storage/cache/redis/msg.go index 5fa43d97a4..16eacc126b 100644 --- a/pkg/common/storage/cache/redis/msg.go +++ b/pkg/common/storage/cache/redis/msg.go @@ -7,8 +7,6 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" - "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" - "github.com/openimsdk/protocol/sdkws" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/utils/datautil" "github.com/redis/go-redis/v9" @@ -18,61 +16,24 @@ import ( // msgCacheTimeout is expiration time of message cache, 86400 seconds const msgCacheTimeout = time.Hour * 24 -func NewMsgCache(client redis.UniversalClient) cache.MsgCache { - return &msgCache{rdb: client} +func NewMsgCache(client redis.UniversalClient, db database.Msg) cache.MsgCache { + return &msgCache{ + rdb: client, + rcClient: rockscache.NewClient(client, *GetRocksCacheOptions()), + msgDocDatabase: db, + } } type msgCache struct { rdb redis.UniversalClient rcClient *rockscache.Client msgDocDatabase database.Msg - msgTable model.MsgDocModel -} - -func (c *msgCache) getMessageCacheKey(conversationID string, seq int64) string { - return cachekey.GetMessageCacheKey(conversationID, seq) } func (c *msgCache) getSendMsgKey(id string) string { return cachekey.GetSendMsgKey(id) } -func (c *msgCache) SetMessagesToCache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (int, error) { - msgMap := datautil.SliceToMap(msgs, func(msg *sdkws.MsgData) string { - return c.getMessageCacheKey(conversationID, msg.Seq) - }) - keys := datautil.Slice(msgs, func(msg *sdkws.MsgData) string { - return c.getMessageCacheKey(conversationID, msg.Seq) - }) - err := ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - var values []string - for _, key := range keys { - if msg, ok := msgMap[key]; ok { - s, err := msgprocessor.Pb2String(msg) - if err != nil { - return err - } - values = append(values, s) - } - } - return LuaSetBatchWithCommonExpire(ctx, c.rdb, keys, values, int(msgCacheTimeout/time.Second)) - }) - if err != nil { - return 0, err - } - return len(msgs), nil -} - -func (c *msgCache) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - var keys []string - for _, seq := range seqs { - keys = append(keys, c.getMessageCacheKey(conversationID, seq)) - } - return ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - return LuaDeleteBatch(ctx, c.rdb, keys) - }) -} - func (c *msgCache) SetSendMsgStatus(ctx context.Context, id string, status int32) error { return errs.Wrap(c.rdb.Set(ctx, c.getSendMsgKey(id), status, time.Hour*24).Err()) } @@ -82,49 +43,12 @@ func (c *msgCache) GetSendMsgStatus(ctx context.Context, id string) (int32, erro return int32(result), errs.Wrap(err) } -func (c *msgCache) GetMessagesBySeq(ctx context.Context, conversationID string, seqs []int64) (seqMsgs []*sdkws.MsgData, failedSeqs []int64, err error) { - var keys []string - keySeqMap := make(map[string]int64, 10) - for _, seq := range seqs { - key := c.getMessageCacheKey(conversationID, seq) - keys = append(keys, key) - keySeqMap[key] = seq - } - err = ProcessKeysBySlot(ctx, c.rdb, keys, func(ctx context.Context, slot int64, keys []string) error { - result, err := LuaGetBatch(ctx, c.rdb, keys) - if err != nil { - return err - } - for i, value := range result { - seq := keySeqMap[keys[i]] - if value == nil { - failedSeqs = append(failedSeqs, seq) - continue - } - - msg := &sdkws.MsgData{} - msgString, ok := value.(string) - if !ok || msgprocessor.String2Pb(msgString, msg) != nil { - failedSeqs = append(failedSeqs, seq) - continue - } - seqMsgs = append(seqMsgs, msg) - - } - return nil - }) - if err != nil { - return nil, nil, err - } - return seqMsgs, failedSeqs, nil -} - func (c *msgCache) GetMessageBySeqs(ctx context.Context, conversationID string, seqs []int64) ([]*model.MsgInfoModel, error) { if len(seqs) == 0 { return nil, nil } getKey := func(seq int64) string { - return cachekey.GetMessageCacheKeyV2(conversationID, seq) + return cachekey.GetMsgCacheKey(conversationID, seq) } getMsgID := func(msg *model.MsgInfoModel) int64 { return msg.Msg.Seq @@ -140,7 +64,7 @@ func (c *msgCache) DelMessageBySeqs(ctx context.Context, conversationID string, return nil } keys := datautil.Slice(seqs, func(seq int64) string { - return cachekey.GetMessageCacheKeyV2(conversationID, seq) + return cachekey.GetMsgCacheKey(conversationID, seq) }) slotKeys, err := groupKeysBySlot(ctx, getRocksCacheRedisClient(c.rcClient), keys) if err != nil { diff --git a/pkg/common/storage/cache/redis/msg_test.go b/pkg/common/storage/cache/redis/msg_test.go deleted file mode 100644 index 10b9ce18b0..0000000000 --- a/pkg/common/storage/cache/redis/msg_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package redis - -import ( - "context" - "fmt" - "github.com/openimsdk/protocol/sdkws" - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" - "google.golang.org/protobuf/proto" - "testing" -) - -func Test_msgCache_SetMessagesToCache(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - msgs []*sdkws.MsgData - } - tests := []struct { - name string - fields fields - args args - want int - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Username: "", Password: "openIM123", DB: 0})}, args{context.Background(), - "cid", []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}}, 3, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - got, err := c.SetMessagesToCache(tt.args.ctx, tt.args.conversationID, tt.args.msgs) - if !tt.wantErr(t, err, fmt.Sprintf("SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs)) { - return - } - assert.Equalf(t, tt.want, got, "SetMessagesToCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.msgs) - }) - } -} - -func Test_msgCache_GetMessagesBySeq(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - seqs []int64 - } - var failedSeq []int64 - tests := []struct { - name string - fields fields - args args - wantSeqMsgs []*sdkws.MsgData - wantFailedSeqs []int64 - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})}, - args{context.Background(), "cid", []int64{1, 2, 3}}, - []*sdkws.MsgData{{Seq: 1}, {Seq: 2}, {Seq: 3}}, failedSeq, assert.NoError}, - {"test2", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123", DB: 0})}, - args{context.Background(), "cid", []int64{4, 5, 6}}, - nil, []int64{4, 5, 6}, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - gotSeqMsgs, gotFailedSeqs, err := c.GetMessagesBySeq(tt.args.ctx, tt.args.conversationID, tt.args.seqs) - if !tt.wantErr(t, err, fmt.Sprintf("GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) { - return - } - equalMsgDataSlices(t, tt.wantSeqMsgs, gotSeqMsgs) - assert.Equalf(t, tt.wantFailedSeqs, gotFailedSeqs, "GetMessagesBySeq(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs) - }) - } -} - -func equalMsgDataSlices(t *testing.T, expected, actual []*sdkws.MsgData) { - assert.Equal(t, len(expected), len(actual), "Slices have different lengths") - for i := range expected { - assert.True(t, proto.Equal(expected[i], actual[i]), "Element %d not equal: expected %v, got %v", i, expected[i], actual[i]) - } -} - -func Test_msgCache_DeleteMessagesFromCache(t *testing.T) { - type fields struct { - rdb redis.UniversalClient - } - type args struct { - ctx context.Context - conversationID string - seqs []int64 - } - tests := []struct { - name string - fields fields - args args - wantErr assert.ErrorAssertionFunc - }{ - {"test1", fields{rdb: redis.NewClient(&redis.Options{Addr: "localhost:16379", Password: "openIM123"})}, - args{context.Background(), "cid", []int64{1, 2, 3}}, assert.NoError}, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - c := &msgCache{ - rdb: tt.fields.rdb, - } - tt.wantErr(t, c.DeleteMessagesFromCache(tt.args.ctx, tt.args.conversationID, tt.args.seqs), - fmt.Sprintf("DeleteMessagesFromCache(%v, %v, %v)", tt.args.ctx, tt.args.conversationID, tt.args.seqs)) - }) - } -} diff --git a/pkg/common/storage/controller/msg.go b/pkg/common/storage/controller/msg.go index e09cd14cf7..62587c8731 100644 --- a/pkg/common/storage/controller/msg.go +++ b/pkg/common/storage/controller/msg.go @@ -58,8 +58,6 @@ type CommonMsgDatabase interface { GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (minSeq int64, maxSeq int64, seqMsg []*sdkws.MsgData, err error) GetMessagesBySeqWithBounds(ctx context.Context, userID string, conversationID string, seqs []int64, pullOrder sdkws.PullOrder) (bool, int64, []*sdkws.MsgData, error) - // ClearUserMsgs marks messages for deletion based on clear time and returns a list of sequence numbers for marked messages. - ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) // DeleteUserMsgsBySeqs allows a user to delete messages based on sequence numbers. DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error // DeleteMsgsPhysicalBySeqs physically deletes messages by emptying them based on sequence numbers. @@ -112,7 +110,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser } return &commonMsgDatabase{ msgDocDatabase: msgDocModel, - msg: msg, + msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, producer: producerToRedis, @@ -122,7 +120,7 @@ func NewCommonMsgDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUser type commonMsgDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel - msg cache.MsgCache + msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producer *kafka.Producer @@ -239,7 +237,7 @@ func (db *commonMsgDatabase) RevokeMsg(ctx context.Context, conversationID strin if err := db.batchInsertBlock(ctx, conversationID, []any{revoke}, updateKeyRevoke, seq); err != nil { return err } - return db.msg.DelMessageBySeqs(ctx, conversationID, []int64{seq}) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, []int64{seq}) } func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userID string, conversationID string, totalSeqs []int64) error { @@ -254,7 +252,7 @@ func (db *commonMsgDatabase) MarkSingleChatMsgsAsRead(ctx context.Context, userI return err } } - return db.msg.DelMessageBySeqs(ctx, conversationID, totalSeqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, totalSeqs) } func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) { @@ -533,82 +531,8 @@ func (db *commonMsgDatabase) GetMessagesBySeqWithBounds(ctx context.Context, use return isEnd, endSeq, successMsgs, nil } -func (db *commonMsgDatabase) ClearUserMsgs(ctx context.Context, userID string, conversationID string, clearTime int64, lastMsgClearTime time.Time) (seqs []int64, err error) { - var index int64 - for { - // from oldest 2 newest, ASC - msgDocModel, err := db.msgDocDatabase.GetMsgDocModelByIndex(ctx, conversationID, index, 1) - if err != nil || msgDocModel.DocID == "" { - if err != nil { - if err == model.ErrMsgListNotExist { - log.ZDebug(ctx, "not doc find", "conversationID", conversationID, "userID", userID, "index", index) - } else { - log.ZError(ctx, "deleteMsgRecursion GetUserMsgListByIndex failed", err, "conversationID", conversationID, "index", index) - } - } - // If an error is reported, or the error cannot be obtained, it is physically deleted and seq delMongoMsgsPhysical(delStruct.delDocIDList) is returned to end the recursion - break - } - - index++ - - // && msgDocModel.Msg[0].Msg.SendTime > lastMsgClearTime.UnixMilli() - if len(msgDocModel.Msg) > 0 { - i := 0 - var over bool - for _, msg := range msgDocModel.Msg { - i++ - // over clear time, need to clear - if msg != nil && msg.Msg != nil && msg.Msg.SendTime+clearTime*1000 <= time.Now().UnixMilli() { - // if msg is not in del list, add to del list - if msg.Msg.SendTime+clearTime*1000 > lastMsgClearTime.UnixMilli() && !datautil.Contain(userID, msg.DelList...) { - seqs = append(seqs, msg.Msg.Seq) - } - } else { - log.ZDebug(ctx, "all msg need destruct is found", "conversationID", conversationID, "userID", userID, "index", index, "stop index", i) - over = true - break - } - } - if over { - break - } - } - } - - log.ZDebug(ctx, "ClearUserMsgs", "conversationID", conversationID, "userID", userID, "seqs", seqs) - - // have msg need to destruct - if len(seqs) > 0 { - // update min seq to clear after - userMinSeq := seqs[len(seqs)-1] + 1 // user min seq when clear after - currentUserMinSeq, err := db.seqUser.GetUserMinSeq(ctx, conversationID, userID) // user min seq when clear before - if err != nil { - return nil, err - } - - // if before < after, update min seq - if currentUserMinSeq < userMinSeq { - if err := db.seqUser.SetUserMinSeq(ctx, conversationID, userID, userMinSeq); err != nil { - return nil, err - } - } - } - return seqs, nil -} - -// this is struct for recursion. -type delMsgRecursionStruct struct { - minSeq int64 - delDocIDs []string -} - -func (d *delMsgRecursionStruct) getSetMinSeq() int64 { - return d.minSeq -} - func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conversationID string, allSeqs []int64) error { - if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, allSeqs); err != nil { + if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs); err != nil { return err } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, allSeqs) { @@ -620,11 +544,11 @@ func (db *commonMsgDatabase) DeleteMsgsPhysicalBySeqs(ctx context.Context, conve return err } } - return db.msg.DelMessageBySeqs(ctx, conversationID, allSeqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, allSeqs) } func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) error { - if err := db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs); err != nil { + if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil { return err } for docID, seqs := range db.msgTable.GetDocIDSeqsMap(conversationID, seqs) { @@ -634,7 +558,7 @@ func (db *commonMsgDatabase) DeleteUserMsgsBySeqs(ctx context.Context, userID st } } } - return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetMaxSeqs(ctx context.Context, conversationIDs []string) (map[string]int64, error) { @@ -678,11 +602,11 @@ func (db *commonMsgDatabase) GetHasReadSeq(ctx context.Context, userID string, c } func (db *commonMsgDatabase) SetSendMsgStatus(ctx context.Context, id string, status int32) error { - return db.msg.SetSendMsgStatus(ctx, id, status) + return db.msgCache.SetSendMsgStatus(ctx, id, status) } func (db *commonMsgDatabase) GetSendMsgStatus(ctx context.Context, id string) (int32, error) { - return db.msg.GetSendMsgStatus(ctx, id) + return db.msgCache.GetSendMsgStatus(ctx, id) } func (db *commonMsgDatabase) GetConversationMinMaxSeqInMongoAndCache(ctx context.Context, conversationID string) (minSeqMongo, maxSeqMongo, minSeqCache, maxSeqCache int64, err error) { @@ -812,7 +736,7 @@ func (db *commonMsgDatabase) DeleteDoc(ctx context.Context, docID string) error if err := db.msgDocDatabase.DeleteDoc(ctx, docID); err != nil { return err } - return db.msg.DelMessageBySeqs(ctx, conversationID, seqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *commonMsgDatabase) GetLastMessageSeqByTime(ctx context.Context, conversationID string, time int64) (int64, error) { @@ -872,7 +796,7 @@ func (db *commonMsgDatabase) handlerQuote(ctx context.Context, userID, conversat } func (db *commonMsgDatabase) GetMessageBySeqs(ctx context.Context, conversationID string, userID string, seqs []int64) ([]*sdkws.MsgData, error) { - msgs, err := db.msg.GetMessageBySeqs(ctx, conversationID, seqs) + msgs, err := db.msgCache.GetMessageBySeqs(ctx, conversationID, seqs) if err != nil { return nil, err } diff --git a/pkg/common/storage/controller/msg_transfer.go b/pkg/common/storage/controller/msg_transfer.go index 1ecd786aa3..58b3a24374 100644 --- a/pkg/common/storage/controller/msg_transfer.go +++ b/pkg/common/storage/controller/msg_transfer.go @@ -5,7 +5,6 @@ import ( "github.com/openimsdk/protocol/constant" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/model" @@ -50,7 +49,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse } return &msgTransferDatabase{ msgDocDatabase: msgDocModel, - msg: msg, + msgCache: msg, seqUser: seqUser, seqConversation: seqConversation, producerToMongo: producerToMongo, @@ -61,7 +60,7 @@ func NewMsgTransferDatabase(msgDocModel database.Msg, msg cache.MsgCache, seqUse type msgTransferDatabase struct { msgDocDatabase database.Msg msgTable model.MsgDocModel - msg cache.MsgCache + msgCache cache.MsgCache seqConversation cache.SeqConversationCache seqUser cache.SeqUser producerToMongo *kafka.Producer @@ -73,10 +72,12 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat return errs.ErrArgs.WrapMsg("msgList is empty") } msgs := make([]any, len(msgList)) + seqs := make([]int64, len(msgList)) for i, msg := range msgList { if msg == nil { continue } + seqs[i] = msg.Seq var offlinePushModel *model.OfflinePushModel if msg.OfflinePushInfo != nil { offlinePushModel = &model.OfflinePushModel{ @@ -114,7 +115,10 @@ func (db *msgTransferDatabase) BatchInsertChat2DB(ctx context.Context, conversat Ex: msg.Ex, } } - return db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq) + if err := db.BatchInsertBlock(ctx, conversationID, msgs, updateKeyMsg, msgList[0].Seq); err != nil { + return err + } + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversationID string, fields []any, key int8, firstSeq int64) error { @@ -219,7 +223,7 @@ func (db *msgTransferDatabase) BatchInsertBlock(ctx context.Context, conversatio } func (db *msgTransferDatabase) DeleteMessagesFromCache(ctx context.Context, conversationID string, seqs []int64) error { - return db.msg.DeleteMessagesFromCache(ctx, conversationID, seqs) + return db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs) } func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, userHasReadMap map[string]int64, err error) { @@ -238,20 +242,17 @@ func (db *msgTransferDatabase) BatchInsertChat2Cache(ctx context.Context, conver isNew = currentMaxSeq == 0 lastMaxSeq := currentMaxSeq userSeqMap := make(map[string]int64) + seqs := make([]int64, 0, lenList) for _, m := range msgs { currentMaxSeq++ m.Seq = currentMaxSeq userSeqMap[m.SendID] = m.Seq + seqs = append(seqs, m.Seq) } - - failedNum, err := db.msg.SetMessagesToCache(ctx, conversationID, msgs) - if err != nil { - prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum)) - log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID) - } else { - prommetrics.MsgInsertRedisSuccessCounter.Inc() + if err := db.msgCache.DelMessageBySeqs(ctx, conversationID, seqs); err != nil { + return 0, false, nil, err } - return lastMaxSeq, isNew, userSeqMap, errs.Wrap(err) + return lastMaxSeq, isNew, userSeqMap, nil } func (db *msgTransferDatabase) SetHasReadSeqs(ctx context.Context, conversationID string, userSeqMap map[string]int64) error {