Skip to content

Commit

Permalink
update gRPC Implement.
Browse files Browse the repository at this point in the history
  • Loading branch information
mo3et committed Nov 29, 2024
1 parent d3aca9d commit 154a19e
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 44 deletions.
1 change: 1 addition & 0 deletions internal/msggateway/hub_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error {
}

type Server struct {
msggateway.UnimplementedMsgGatewayServer
rpcPort int
LongConnServer LongConnServer
config *Config
Expand Down
1 change: 1 addition & 0 deletions internal/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

type pushServer struct {
pbpush.UnimplementedPushMsgServiceServer
database controller.PushDatabase
disCov discovery.SvcDiscoveryRegistry
offlinePusher offlinepush.OfflinePusher
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
)

type authServer struct {
pbauth.UnimplementedAuthServer
authDatabase controller.AuthDatabase
userRpcClient *rpcclient.UserRpcClient
RegisterCenter discovery.SvcDiscoveryRegistry
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/conversation/conversation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

type conversationServer struct {
pbconversation.UnimplementedConversationServer
msgRpcClient *rpcclient.MessageRpcClient
user *rpcclient.UserRpcClient
groupRpcClient *rpcclient.GroupRpcClient
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/group/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import (
)

type groupServer struct {
pbgroup.UnimplementedGroupServer
db controller.GroupDatabase
user rpcclient.UserRpcClient
notification *GroupNotificationSender
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type (
msgNotificationSender *MsgNotificationSender // RPC client for sending msg notifications.
config *Config // Global configuration settings.
webhookClient *webhook.Client
msg.UnimplementedMsgServer
}

Config struct {
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/relation/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
)

type friendServer struct {
relation.UnimplementedFriendServer
db controller.FriendDatabase
blackDatabase controller.BlackDatabase
userRpcClient *rpcclient.UserRpcClient
Expand Down
101 changes: 57 additions & 44 deletions internal/rpc/third/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,74 +290,87 @@ func (t *thirdServer) apiAddress(prefix, name string) string {
func (t *thirdServer) DeleteOutdatedData(ctx context.Context, req *third.DeleteOutdatedDataReq) (*third.DeleteOutdatedDataResp, error) {
var conf config.Third
expireTime := time.UnixMilli(req.ExpireTime)
var deltotal int
excuteNum := 5

findPagination := &sdkws.RequestPagination{
PageNumber: 1,
ShowNumber: 1000,
ShowNumber: 500,
}

for i := 0; i < excuteNum; i++ {
// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindByExpires(ctx, expireTime, findPagination)
log.ZDebug(ctx, "del type is ", "needDelType", req.ObjectGroup)

// Find all expired data in S3 database
total, models, err := t.s3dataBase.FindNeedDeleteObjectByDB(ctx, expireTime, req.ObjectGroup, findPagination)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}

if total == 0 {
log.ZDebug(ctx, "Not have OutdatedData", "delete Total", total)
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}

needDelObjectKeys := make([]string, len(models))
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
}

// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)

for _, key := range needDelObjectKeys {
// Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
}
needDelObjectKeys := make([]string, len(models))
for _, model := range models {
needDelObjectKeys = append(needDelObjectKeys, model.Key)
}

// Remove duplicate keys, have the same key use in different models
needDelObjectKeys = datautil.Distinct(needDelObjectKeys)

for _, key := range needDelObjectKeys {
// Find all models by key
keyModels, err := t.s3dataBase.FindModelsByKey(ctx, key)
if err != nil && errs.Unwrap(err) != mongo.ErrNoDocuments {
return nil, errs.Wrap(err)
// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, keymodel := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if keymodel.Group == "" || keymodel.CreateTime.After(expireTime) {
needDelKey = false
break
}
}

// check keyModels, if all keyModels.
needDelKey := true // Default can delete
for _, model := range keyModels {
// If group is empty or CreateTime is after expireTime, can't delete this key
if model.Group == "" || model.CreateTime.After(expireTime) {
needDelKey = false
break
// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
// If have a thumbnail, delete it
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)
if thumbnailKey != "" {
err := t.s3dataBase.DeleteObject(ctx, thumbnailKey)
if err != nil {
log.ZWarn(ctx, "Delete thumbnail object is error:", errs.Wrap(err), "thumbnailKey", thumbnailKey)
}
}

// If this object is not referenced by not expire data, delete it
if needDelKey && t.minio != nil {
thumbnailKey, _ := t.getMinioImageThumbnailKey(ctx, key)

t.s3dataBase.DeleteObject(ctx, thumbnailKey)
t.s3dataBase.DeleteObject(ctx, key)

t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
// Delete object
err = t.s3dataBase.DeleteObject(ctx, key)
if err != nil {
log.ZWarn(ctx, "Delete object is error", errs.Wrap(err), "object key", key)
}
}

for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
// Delete cache key
err = t.s3dataBase.DelS3Key(ctx, conf.Object.Enable, key)
if err != nil {
return nil, errs.Wrap(err)
log.ZWarn(ctx, "Delete cache key is error:", errs.Wrap(err), "cache S3 key:", key)
}
}
}

if total < int64(findPagination.ShowNumber) {
break
// handle delete data in S3 database
for _, model := range models {
// Delete all expired data row in S3 database
err := t.s3dataBase.DeleteSpecifiedData(ctx, model.Engine, model.Name)
if err != nil {
return nil, errs.Wrap(err)
}

deltotal += int(total)
}

log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", deltotal)
log.ZDebug(ctx, "DeleteOutdatedData", "delete Total", total)

return &third.DeleteOutdatedDataResp{}, nil
return &third.DeleteOutdatedDataResp{Count: int32(total)}, nil
}

type FormDataMate struct {
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/third/third.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
)

type thirdServer struct {
third.UnimplementedThirdServer
thirdDatabase controller.ThirdDatabase
s3dataBase controller.S3Database
userRpcClient rpcclient.UserRpcClient
Expand Down
1 change: 1 addition & 0 deletions internal/rpc/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
)

type userServer struct {
pbuser.UnimplementedUserServer
online cache.OnlineCache
db controller.UserDatabase
friendNotificationSender *relation.FriendNotificationSender
Expand Down

0 comments on commit 154a19e

Please sign in to comment.