Skip to content

Commit

Permalink
Merge branch 'openimsdk:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
icey-yu authored Aug 1, 2024
2 parents 8c8694c + 5dac915 commit 79c40a8
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 61 deletions.
1 change: 1 addition & 0 deletions internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func (c *ConsumerHandler) GetConnsAndOnlinePush(ctx context.Context, msg *sdkws.
offlineUserIDs = append(offlineUserIDs, userID)
}
}
log.ZDebug(ctx, "GetConnsAndOnlinePush online cache", "sendID", msg.SendID, "recvID", msg.RecvID, "groupID", msg.GroupID, "sessionType", msg.SessionType, "clientMsgID", msg.ClientMsgID, "serverMsgID", msg.ServerMsgID, "offlineUserIDs", offlineUserIDs, "onlineUserIDs", onlineUserIDs)
var result []*msggateway.SingleMsgToUserResults
if len(onlineUserIDs) > 0 {
var err error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package friend
package relation

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package friend
package relation

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package friend
package relation

import (
"context"

"github.com/openimsdk/tools/mq/memamq"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package friend
package relation

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package friend
package relation

import (
"context"
Expand Down
6 changes: 3 additions & 3 deletions internal/rpc/user/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package user
import (
"context"
"errors"
"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
Expand Down Expand Up @@ -54,7 +54,7 @@ import (
type userServer struct {
online cache.OnlineCache
db controller.UserDatabase
friendNotificationSender *friend.FriendNotificationSender
friendNotificationSender *relation.FriendNotificationSender
userNotificationSender *UserNotificationSender
friendRpcClient *rpcclient.FriendRpcClient
groupRpcClient *rpcclient.GroupRpcClient
Expand Down Expand Up @@ -105,7 +105,7 @@ func Start(ctx context.Context, config *Config, client registry.SvcDiscoveryRegi
RegisterCenter: client,
friendRpcClient: &friendRpcClient,
groupRpcClient: &groupRpcClient,
friendNotificationSender: friend.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, friend.WithDBFunc(database.FindWithError)),
friendNotificationSender: relation.NewFriendNotificationSender(&config.NotificationConfig, &msgRpcClient, relation.WithDBFunc(database.FindWithError)),
userNotificationSender: NewUserNotificationSender(config, &msgRpcClient, WithUserFunc(database.FindWithError)),
config: config,
webhookClient: webhook.NewWebhookClient(config.WebhooksConfig.URL),
Expand Down
34 changes: 17 additions & 17 deletions pkg/common/cmd/friend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package cmd
import (
"context"

"github.com/openimsdk/open-im-server/v3/internal/rpc/friend"
"github.com/openimsdk/open-im-server/v3/internal/rpc/relation"
"github.com/openimsdk/open-im-server/v3/pkg/common/startrpc"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/system/program"
Expand All @@ -26,23 +26,23 @@ import (

type FriendRpcCmd struct {
*RootCmd
ctx context.Context
configMap map[string]any
friendConfig *friend.Config
ctx context.Context
configMap map[string]any
relationConfig *relation.Config
}

func NewFriendRpcCmd() *FriendRpcCmd {
var friendConfig friend.Config
ret := &FriendRpcCmd{friendConfig: &friendConfig}
var relationConfig relation.Config
ret := &FriendRpcCmd{relationConfig: &relationConfig}
ret.configMap = map[string]any{
OpenIMRPCFriendCfgFileName: &friendConfig.RpcConfig,
RedisConfigFileName: &friendConfig.RedisConfig,
MongodbConfigFileName: &friendConfig.MongodbConfig,
ShareFileName: &friendConfig.Share,
NotificationFileName: &friendConfig.NotificationConfig,
WebhooksConfigFileName: &friendConfig.WebhooksConfig,
LocalCacheConfigFileName: &friendConfig.LocalCacheConfig,
DiscoveryConfigFilename: &friendConfig.Discovery,
OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig,
RedisConfigFileName: &relationConfig.RedisConfig,
MongodbConfigFileName: &relationConfig.MongodbConfig,
ShareFileName: &relationConfig.Share,
NotificationFileName: &relationConfig.NotificationConfig,
WebhooksConfigFileName: &relationConfig.WebhooksConfig,
LocalCacheConfigFileName: &relationConfig.LocalCacheConfig,
DiscoveryConfigFilename: &relationConfig.Discovery,
}
ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap))
ret.ctx = context.WithValue(context.Background(), "version", version.Version)
Expand All @@ -57,7 +57,7 @@ func (a *FriendRpcCmd) Exec() error {
}

func (a *FriendRpcCmd) runE() error {
return startrpc.Start(a.ctx, &a.friendConfig.Discovery, &a.friendConfig.RpcConfig.Prometheus, a.friendConfig.RpcConfig.RPC.ListenIP,
a.friendConfig.RpcConfig.RPC.RegisterIP, a.friendConfig.RpcConfig.RPC.Ports,
a.Index(), a.friendConfig.Share.RpcRegisterName.Friend, &a.friendConfig.Share, a.friendConfig, friend.Start)
return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP,
a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.Ports,
a.Index(), a.relationConfig.Share.RpcRegisterName.Friend, &a.relationConfig.Share, a.relationConfig, relation.Start)
}
6 changes: 5 additions & 1 deletion pkg/common/storage/cache/redis/online.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache"
"github.com/openimsdk/open-im-server/v3/pkg/common/storage/cache/cachekey"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/redis/go-redis/v9"
"strconv"
"time"
Expand Down Expand Up @@ -82,8 +83,11 @@ func (s *userOnline) SetUserOnline(ctx context.Context, userID string, online, o
argv = append(argv, platformID)
}
keys := []string{s.getUserOnlineKey(userID), userID, s.channelName}
if err := s.rdb.Eval(ctx, script, keys, argv).Err(); err != nil {
status, err := s.rdb.Eval(ctx, script, keys, argv).Result()
if err != nil {
log.ZError(ctx, "redis SetUserOnline", err, "userID", userID, "online", online, "offline", offline)
return err
}
log.ZDebug(ctx, "redis SetUserOnline", "userID", userID, "online", online, "offline", offline, "status", status)
return nil
}
76 changes: 41 additions & 35 deletions pkg/rpccache/online.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewOnlineCache(user rpcclient.UserRpcClient, group *GroupLocalCache, rdb re
for message := range rdb.Subscribe(ctx, cachekey.OnlineChannel).Channel() {
userID, platformIDs, err := useronline.ParseUserOnlineStatus(message.Payload)
if err != nil {
log.ZError(ctx, "OnlineCache redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
log.ZError(ctx, "OnlineCache setUserOnline redis subscribe parseUserOnlineStatus", err, "payload", message.Payload, "channel", message.Channel)
continue
}
storageCache := x.setUserOnline(userID, platformIDs)
Expand All @@ -48,9 +48,15 @@ type OnlineCache struct {
}

func (o *OnlineCache) GetUserOnlinePlatform(ctx context.Context, userID string) ([]int32, error) {
return o.local.Get(userID, func() ([]int32, error) {
platformIDs, err := o.local.Get(userID, func() ([]int32, error) {
return o.user.GetUserOnlinePlatform(ctx, userID)
})
if err != nil {
log.ZError(ctx, "OnlineCache GetUserOnlinePlatform", err, "userID", userID)
return nil, err
}
log.ZDebug(ctx, "OnlineCache GetUserOnlinePlatform", "userID", userID, "platformIDs", platformIDs)
return platformIDs, nil
}

func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, error) {
Expand All @@ -61,39 +67,39 @@ func (o *OnlineCache) GetUserOnline(ctx context.Context, userID string) (bool, e
return len(platformIDs) > 0, nil
}

func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
onlineUserIDs := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
return onlineUserIDs, nil
}

func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
if err != nil {
return nil, err
}
var onlineUserIDs []string
for _, userID := range userIDs {
online, err := o.GetUserOnline(ctx, userID)
if err != nil {
return nil, err
}
if online {
onlineUserIDs = append(onlineUserIDs, userID)
}
}
log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
return onlineUserIDs, nil
}
//func (o *OnlineCache) GetUsersOnline(ctx context.Context, userIDs []string) ([]string, error) {
// onlineUserIDs := make([]string, 0, len(userIDs))
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetUsersOnline", "userIDs", userIDs, "onlineUserIDs", onlineUserIDs)
// return onlineUserIDs, nil
//}
//
//func (o *OnlineCache) GetGroupOnline(ctx context.Context, groupID string) ([]string, error) {
// userIDs, err := o.group.GetGroupMemberIDs(ctx, groupID)
// if err != nil {
// return nil, err
// }
// var onlineUserIDs []string
// for _, userID := range userIDs {
// online, err := o.GetUserOnline(ctx, userID)
// if err != nil {
// return nil, err
// }
// if online {
// onlineUserIDs = append(onlineUserIDs, userID)
// }
// }
// log.ZDebug(ctx, "OnlineCache GetGroupOnline", "groupID", groupID, "onlineUserIDs", onlineUserIDs, "allUserID", userIDs)
// return onlineUserIDs, nil
//}

func (o *OnlineCache) setUserOnline(userID string, platformIDs []int32) bool {
return o.local.SetHas(userID, platformIDs)
Expand Down

0 comments on commit 79c40a8

Please sign in to comment.