Skip to content

Commit

Permalink
update loadConfig and discovery logic in kubernetes.
Browse files Browse the repository at this point in the history
  • Loading branch information
mo3et committed Dec 7, 2024
1 parent c9e13e6 commit 0d8f6dd
Show file tree
Hide file tree
Showing 14 changed files with 370 additions and 181 deletions.
7 changes: 4 additions & 3 deletions config/discovery.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ etcd:
username: ''
password: ''

kubenetes:
namespace: default

rpcService:
user: user-rpc-service
friend: friend-rpc-service
Expand All @@ -14,6 +17,4 @@ rpcService:
group: group-rpc-service
auth: auth-rpc-service
conversation: conversation-rpc-service
third: third-rpc-service


third: third-rpc-service
16 changes: 11 additions & 5 deletions internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ package api
import (
"context"
"fmt"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"net"
"net/http"
"os"
Expand All @@ -28,6 +25,11 @@ import (
"syscall"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/network"
"github.com/openimsdk/tools/utils/runtimeenv"

kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery"
Expand All @@ -40,6 +42,8 @@ type Config struct {
API config.API
Share config.Share
Discovery config.Discovery

RuntimeEnv string
}

func Start(ctx context.Context, index int, config *Config) error {
Expand All @@ -48,10 +52,12 @@ func Start(ctx context.Context, index int, config *Config) error {
return err
}

config.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()

var client discovery.SvcDiscoveryRegistry

// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(&config.Discovery)
client, err = kdisc.NewDiscoveryRegister(&config.Discovery, config.RuntimeEnv)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
Expand Down Expand Up @@ -81,7 +87,7 @@ func Start(ctx context.Context, index int, config *Config) error {
address := net.JoinHostPort(network.GetListenIP(config.API.Api.ListenIP), strconv.Itoa(apiPort))

server := http.Server{Addr: address, Handler: router}
log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
log.CInfo(ctx, "API server is initializing", "runtimeEnv", config.RuntimeEnv, "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort)
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
Expand Down
10 changes: 8 additions & 2 deletions internal/msggateway/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@ package msggateway

import (
"context"
"time"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/rpccache"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"time"
"github.com/openimsdk/tools/utils/runtimeenv"

"github.com/openimsdk/tools/log"
)
Expand All @@ -31,11 +33,15 @@ type Config struct {
RedisConfig config.Redis
WebhooksConfig config.Webhooks
Discovery config.Discovery

RuntimeEnv string
}

// Start run ws server.
func Start(ctx context.Context, index int, conf *Config) error {
log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts,
conf.RuntimeEnv = runtimeenv.PrintRuntimeEnvironment()

log.CInfo(ctx, "MSG-GATEWAY server is initializing", "runtimeEnv", conf.RuntimeEnv, "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts,
"rpcPorts", conf.MsgGateway.RPC.Ports,
"wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports)
wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index)
Expand Down
10 changes: 8 additions & 2 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
Expand All @@ -51,6 +52,8 @@ type MsgTransfer struct {
historyMongoCH *OnlineHistoryMongoConsumerHandler
ctx context.Context
cancel context.CancelFunc

runTimeEnv string
}

type Config struct {
Expand All @@ -64,7 +67,9 @@ type Config struct {
}

func Start(ctx context.Context, index int, config *Config) error {
log.CInfo(ctx, "MSG-TRANSFER server is initializing", "prometheusPorts",
runTimeEnv := runtimeenv.PrintRuntimeEnvironment()

log.CInfo(ctx, "MSG-TRANSFER server is initializing", "runTimeEnv", runTimeEnv, "prometheusPorts",
config.MsgTransfer.Prometheus.Ports, "index", index)

mgocli, err := mongoutil.NewMongoDB(ctx, config.MongodbConfig.Build())
Expand All @@ -75,7 +80,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil {
return err
}
client, err := discRegister.NewDiscoveryRegister(&config.Discovery)
client, err := discRegister.NewDiscoveryRegister(&config.Discovery, runTimeEnv)
if err != nil {
return err
}
Expand Down Expand Up @@ -115,6 +120,7 @@ func Start(ctx context.Context, index int, config *Config) error {
msgTransfer := &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
runTimeEnv: runTimeEnv,
}
return msgTransfer.Start(index, config)
}
Expand Down
9 changes: 7 additions & 2 deletions internal/tools/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/openimsdk/tools/mcontext"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/utils/runtimeenv"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

Expand All @@ -26,14 +27,18 @@ type CronTaskConfig struct {
CronTask config.CronTask
Share config.Share
Discovery config.Discovery

runTimeEnv string
}

func Start(ctx context.Context, config *CronTaskConfig) error {
log.CInfo(ctx, "CRON-TASK server is initializing", "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
config.runTimeEnv = runtimeenv.PrintRuntimeEnvironment()

log.CInfo(ctx, "CRON-TASK server is initializing", "runTimeEnv", config.runTimeEnv, "chatRecordsClearTime", config.CronTask.CronExecuteTime, "msgDestructTime", config.CronTask.RetainChatRecords)
if config.CronTask.RetainChatRecords < 1 {
return errs.New("msg destruct time must be greater than 1").Wrap()
}
client, err := kdisc.NewDiscoveryRegister(&config.Discovery)
client, err := kdisc.NewDiscoveryRegister(&config.Discovery, config.runTimeEnv)
if err != nil {
return errs.WrapMsg(err, "failed to register discovery service")
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/common/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ package cmd

import (
"fmt"
"path/filepath"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -105,18 +105,19 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err
if err != nil {
return err
}

runtimeEnv := runtimeenv.PrintRuntimeEnvironment()

// Load common configuration file
//opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share}
for configFileName, configStruct := range opts.configMap {
err := config.LoadConfig(filepath.Join(configDirectory, configFileName),
ConfigEnvPrefixMap[configFileName], configStruct)
err := config.Load(configDirectory, configFileName, ConfigEnvPrefixMap[configFileName], runtimeEnv, configStruct)
if err != nil {
return err
}
}
// Load common log configuration file
return config.LoadConfig(filepath.Join(configDirectory, LogConfigFileName),
ConfigEnvPrefixMap[LogConfigFileName], &r.log)
return config.Load(configDirectory, LogConfigFileName, ConfigEnvPrefixMap[LogConfigFileName], runtimeEnv, &r.log)
}

func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts {
Expand Down
8 changes: 7 additions & 1 deletion pkg/common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
package config

import (
"github.com/openimsdk/tools/s3/aws"
"strings"
"time"

"github.com/openimsdk/tools/s3/aws"

"github.com/openimsdk/tools/db/mongoutil"
"github.com/openimsdk/tools/db/redisutil"
"github.com/openimsdk/tools/mq/kafka"
Expand Down Expand Up @@ -474,9 +475,14 @@ type ZooKeeper struct {
type Discovery struct {
Enable string `mapstructure:"enable"`
Etcd Etcd `mapstructure:"etcd"`
Kubernetes Kubernetes `mapstructure:"kubernetes"`
RpcService RpcService `mapstructure:"rpcService"`
}

type Kubernetes struct {
Namespace string `mapstructure:"namespace"`
}

type Etcd struct {
RootDirectory string `mapstructure:"rootDirectory"`
Address []string `mapstructure:"address"`
Expand Down
6 changes: 6 additions & 0 deletions pkg/common/config/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ package config

const ConfKey = "conf"

const (
MountConfigFilePath = "CONFIG_PATH"
DeploymentType = "DEPLOYMENT_TYPE"
KUBERNETES = "kubernetes"
)

const (
// DefaultDirPerm is used for creating general directories, allowing the owner to read, write, and execute,
// while the group and others can only read and execute.
Expand Down
20 changes: 18 additions & 2 deletions pkg/common/config/load_config.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
package config

import (
"os"
"path/filepath"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/openimsdk/tools/errs"
"github.com/spf13/viper"
"strings"
)

func LoadConfig(path string, envPrefix string, config any) error {
func Load(configDirectory string, configFileName string, envPrefix string, runtimeEnv string, config any) error {
if runtimeEnv == KUBERNETES {
mountPath := os.Getenv(MountConfigFilePath)
if mountPath == "" {
return errs.ErrArgs.WrapMsg(MountConfigFilePath + " env is empty")
}

return loadConfig(filepath.Join(mountPath, configFileName), envPrefix, config)
}

return loadConfig(filepath.Join(configDirectory, configFileName), envPrefix, config)
}

func loadConfig(path string, envPrefix string, config any) error {
v := viper.New()
v.SetConfigFile(path)
v.SetEnvPrefix(envPrefix)
Expand Down
40 changes: 32 additions & 8 deletions pkg/common/config/load_config_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,59 @@
package config

import (
"github.com/stretchr/testify/assert"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestLoadLogConfig(t *testing.T) {
var log Log
err := LoadConfig("../../../config/log.yml", "IMENV_LOG", &log)
os.Setenv("IMENV_LOG_REMAINLOGLEVEL", "5")
err := Load("../../../config/", "log.yml", "IMENV_LOG", &log)
assert.Nil(t, err)
t.Log(log.RemainLogLevel)
// assert.Equal(t, "../../../../logs/", log.StorageLocation)
}

func TestLoadMongoConfig(t *testing.T) {
var mongo Mongo
// os.Setenv("DEPLOYMENT_TYPE", "kubernetes")
os.Setenv("IMENV_MONGODB_PASSWORD", "openIM1231231")
// os.Setenv("IMENV_MONGODB_URI", "openIM123")
// os.Setenv("IMENV_MONGODB_USERNAME", "openIM123")
err := Load("../../../config/", "mongodb.yml", "IMENV_MONGODB", &mongo)
// err := LoadConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo)

assert.Nil(t, err)
assert.Equal(t, "../../../../logs/", log.StorageLocation)
t.Log(mongo.Password)
// assert.Equal(t, "openIM123", mongo.Password)
t.Log(os.Getenv("IMENV_MONGODB_PASSWORD"))
t.Log(mongo)
// //export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0"
// assert.Equal(t, "0.0.0.0", user.RPC.ListenIP)
// //export IMENV_OPENIM_RPC_USER_RPC_PORTS="10110,10111,10112"
// assert.Equal(t, []int{10110, 10111, 10112}, user.RPC.Ports)
}

func TestLoadMinioConfig(t *testing.T) {
var storageConfig Minio
err := LoadConfig("../../../config/minio.yml", "IMENV_MINIO", &storageConfig)
err := Load("../../../config/minio.yml", "IMENV_MINIO", "", &storageConfig)
assert.Nil(t, err)
assert.Equal(t, "openim", storageConfig.Bucket)
}

func TestLoadWebhooksConfig(t *testing.T) {
var webhooks Webhooks
err := LoadConfig("../../../config/webhooks.yml", "IMENV_WEBHOOKS", &webhooks)
err := Load("../../../config/webhooks.yml", "IMENV_WEBHOOKS", "", &webhooks)
assert.Nil(t, err)
assert.Equal(t, 5, webhooks.BeforeAddBlack.Timeout)

}

func TestLoadOpenIMRpcUserConfig(t *testing.T) {
var user User
err := LoadConfig("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", &user)
err := Load("../../../config/openim-rpc-user.yml", "IMENV_OPENIM_RPC_USER", "", &user)
assert.Nil(t, err)
//export IMENV_OPENIM_RPC_USER_RPC_LISTENIP="0.0.0.0"
assert.Equal(t, "0.0.0.0", user.RPC.ListenIP)
Expand All @@ -39,14 +63,14 @@ func TestLoadOpenIMRpcUserConfig(t *testing.T) {

func TestLoadNotificationConfig(t *testing.T) {
var noti Notification
err := LoadConfig("../../../config/notification.yml", "IMENV_NOTIFICATION", &noti)
err := Load("../../../config/notification.yml", "IMENV_NOTIFICATION", "", &noti)
assert.Nil(t, err)
assert.Equal(t, "Your friend's profile has been changed", noti.FriendRemarkSet.OfflinePush.Title)
}

func TestLoadOpenIMThirdConfig(t *testing.T) {
var third Third
err := LoadConfig("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", &third)
err := Load("../../../config/openim-rpc-third.yml", "IMENV_OPENIM_RPC_THIRD", "", &third)
assert.Nil(t, err)
assert.Equal(t, "enabled", third.Object.Enable)
assert.Equal(t, "https://oss-cn-chengdu.aliyuncs.com", third.Object.Oss.Endpoint)
Expand Down
Loading

0 comments on commit 0d8f6dd

Please sign in to comment.