diff --git a/config/openim-push.yml b/config/openim-push.yml index e983246203..5db5b541a0 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -21,7 +21,7 @@ prometheus: maxConcurrentWorkers: 3 #Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified. enable: -geTui: +getui: pushUrl: https://restapi.getui.com/v2/$appId masterSecret: appKey: diff --git a/docker-compose.yml b/docker-compose.yml index 51dd4d04f0..0cdeebe43a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -82,6 +82,8 @@ services: - ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380 - ETCD_INITIAL_CLUSTER_TOKEN=tkn - ETCD_INITIAL_CLUSTER_STATE=new + volumes: + - "${DATA_DIR}/components/etcd:/etcd-data" restart: always networks: - openim diff --git a/internal/api/config_manager.go b/internal/api/config_manager.go new file mode 100644 index 0000000000..c330cad461 --- /dev/null +++ b/internal/api/config_manager.go @@ -0,0 +1,250 @@ +package api + +import ( + "encoding/json" + "reflect" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/apistruct" + "github.com/openimsdk/open-im-server/v3/pkg/authverify" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" + "github.com/openimsdk/open-im-server/v3/version" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/runtimeenv" + clientv3 "go.etcd.io/etcd/client/v3" +) + +type ConfigManager struct { + imAdminUserID []string + config *config.AllConfig + client *clientv3.Client + configPath string + runtimeEnv string +} + +func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager { + return &ConfigManager{ + imAdminUserID: IMAdminUserID, + config: cfg, + client: client, + configPath: configPath, + runtimeEnv: runtimeEnv, + } +} + +func (cm *ConfigManager) CheckAdmin(c *gin.Context) { + if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil { + apiresp.GinError(c, err) + c.Abort() + } +} + +func (cm *ConfigManager) GetConfig(c *gin.Context) { + var req apistruct.GetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + conf := cm.config.Name2Config(req.ConfigName) + if conf == nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap()) + return + } + b, err := json.Marshal(conf) + if err != nil { + apiresp.GinError(c, err) + return + } + apiresp.GinSuccess(c, string(b)) +} + +func (cm *ConfigManager) GetConfigList(c *gin.Context) { + var resp apistruct.GetConfigListResp + resp.ConfigNames = cm.config.GetConfigNames() + resp.Environment = runtimeenv.PrintRuntimeEnvironment() + resp.Version = version.Version + + apiresp.GinSuccess(c, resp) +} + +func (cm *ConfigManager) SetConfig(c *gin.Context) { + if cm.config.Discovery.Enable != config.ETCD { + apiresp.GinError(c, errs.New("only etcd support set config").Wrap()) + return + } + var req apistruct.SetConfigReq + if err := c.BindJSON(&req); err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + var err error + switch req.ConfigName { + case cm.config.Discovery.GetConfigFileName(): + err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Kafka.GetConfigFileName(): + err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.LocalCache.GetConfigFileName(): + err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Log.GetConfigFileName(): + err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Minio.GetConfigFileName(): + err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Mongo.GetConfigFileName(): + err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Notification.GetConfigFileName(): + err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.API.GetConfigFileName(): + err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.CronTask.GetConfigFileName(): + err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.MsgGateway.GetConfigFileName(): + err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.MsgTransfer.GetConfigFileName(): + err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Push.GetConfigFileName(): + err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Auth.GetConfigFileName(): + err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Conversation.GetConfigFileName(): + err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Friend.GetConfigFileName(): + err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Group.GetConfigFileName(): + err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Msg.GetConfigFileName(): + err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Third.GetConfigFileName(): + err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.User.GetConfigFileName(): + err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Redis.GetConfigFileName(): + err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Share.GetConfigFileName(): + err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + case cm.config.Webhooks.GetConfigFileName(): + err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client) + default: + apiresp.GinError(c, errs.ErrArgs.Wrap()) + return + } + if err != nil { + apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap()) + return + } + apiresp.GinSuccess(c, nil) +} + +func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error { + conf := new(T) + err := json.Unmarshal([]byte(req.Data), &conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + eq := reflect.DeepEqual(old, conf) + if eq { + return nil + } + data, err := json.Marshal(conf) + if err != nil { + return errs.ErrArgs.WithDetail(err.Error()).Wrap() + } + _, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data)) + if err != nil { + return errs.WrapMsg(err, "save to etcd failed") + } + return nil +} + +func (cm *ConfigManager) ResetConfig(c *gin.Context) { + go cm.resetConfig(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) resetConfig(c *gin.Context) { + txn := cm.client.Txn(c) + type initConf struct { + old any + new any + isChanged bool + } + configMap := map[string]*initConf{ + cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)}, + cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)}, + cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)}, + cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)}, + cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)}, + cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)}, + cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)}, + cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)}, + cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)}, + cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)}, + cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)}, + cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)}, + cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)}, + cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)}, + cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)}, + cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)}, + cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)}, + cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)}, + cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)}, + cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)}, + cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)}, + cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)}, + } + + changedKeys := make([]string, 0, len(configMap)) + for k, v := range configMap { + err := config.Load( + cm.configPath, + k, + config.EnvPrefixMap[k], + cm.runtimeEnv, + v.new, + ) + if err != nil { + log.ZError(c, "load config failed", err) + continue + } + v.isChanged = reflect.DeepEqual(v.old, v.new) + if !v.isChanged { + changedKeys = append(changedKeys, k) + } + } + + ops := make([]clientv3.Op, 0) + for _, k := range changedKeys { + data, err := json.Marshal(configMap[k].new) + if err != nil { + log.ZError(c, "marshal config failed", err) + continue + } + ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data))) + } + if len(ops) > 0 { + txn.Then(ops...) + _, err := txn.Commit() + if err != nil { + log.ZError(c, "commit etcd txn failed", err) + return + } + } +} + +func (cm *ConfigManager) Restart(c *gin.Context) { + go cm.restart(c) + apiresp.GinSuccess(c, nil) +} + +func (cm *ConfigManager) restart(c *gin.Context) { + time.Sleep(time.Millisecond * 200) // wait for Restart http call return + t := time.Now().Unix() + _, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t))) + if err != nil { + log.ZError(c, "restart etcd put key failed", err) + } +} diff --git a/internal/api/init.go b/internal/api/init.go index 4a558c8b6c..780ecb913c 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -27,7 +27,8 @@ import ( "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" @@ -43,11 +44,10 @@ import ( ) type Config struct { - API conf.API - Share conf.Share - Discovery conf.Discovery + *conf.AllConfig RuntimeEnv string + ConfigPath string } func Start(ctx context.Context, index int, config *Config) error { @@ -139,22 +139,33 @@ func Start(ctx context.Context, index int, config *Config) error { if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) netDone <- struct{}{} - } }() + if config.Discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames()) + cm.Watch(ctx) + } + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) - ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - select { - case <-sigs: - program.SIGTERMExit() + shutdown := func() error { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() err := server.Shutdown(ctx) if err != nil { return errs.WrapMsg(err, "shutdown err") } + return nil + } + disetcd.RegisterShutDown(shutdown) + select { + case <-sigs: + program.SIGTERMExit() + if err := shutdown(); err != nil { + return err + } case <-netDone: close(netDone) return netErr diff --git a/internal/api/router.go b/internal/api/router.go index 62b8079e61..789b8205d5 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -3,34 +3,34 @@ package api import ( "context" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" - "github.com/openimsdk/protocol/conversation" - "github.com/openimsdk/protocol/group" - "github.com/openimsdk/protocol/msg" - "github.com/openimsdk/protocol/relation" - "github.com/openimsdk/protocol/third" - "github.com/openimsdk/protocol/user" "net/http" "strings" - "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" - pbAuth "github.com/openimsdk/protocol/auth" - "github.com/gin-contrib/gzip" - "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - + "github.com/openimsdk/open-im-server/v3/internal/api/jssdk" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/open-im-server/v3/pkg/common/servererrs" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + pbAuth "github.com/openimsdk/protocol/auth" "github.com/openimsdk/protocol/constant" + "github.com/openimsdk/protocol/conversation" + "github.com/openimsdk/protocol/group" + "github.com/openimsdk/protocol/msg" + "github.com/openimsdk/protocol/relation" + "github.com/openimsdk/protocol/third" + "github.com/openimsdk/protocol/user" "github.com/openimsdk/tools/apiresp" "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/mw" + clientv3 "go.etcd.io/etcd/client/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) const ( @@ -55,34 +55,34 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, config *Config) (*gin.Engine, error) { +func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, cfg *Config) (*gin.Engine, error) { client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) - authConn, err := client.GetConn(ctx, config.Discovery.RpcService.Auth) + authConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Auth) if err != nil { return nil, err } - userConn, err := client.GetConn(ctx, config.Discovery.RpcService.User) + userConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.User) if err != nil { return nil, err } - groupConn, err := client.GetConn(ctx, config.Discovery.RpcService.Group) + groupConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Group) if err != nil { return nil, err } - friendConn, err := client.GetConn(ctx, config.Discovery.RpcService.Friend) + friendConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Friend) if err != nil { return nil, err } - conversationConn, err := client.GetConn(ctx, config.Discovery.RpcService.Conversation) + conversationConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Conversation) if err != nil { return nil, err } - thirdConn, err := client.GetConn(ctx, config.Discovery.RpcService.Third) + thirdConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Third) if err != nil { return nil, err } - msgConn, err := client.GetConn(ctx, config.Discovery.RpcService.Msg) + msgConn, err := client.GetConn(ctx, cfg.Discovery.RpcService.Msg) if err != nil { return nil, err } @@ -91,7 +91,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co if v, ok := binding.Validator.Engine().(*validator.Validate); ok { _ = v.RegisterValidation("required_if", RequiredIf) } - switch config.API.Api.CompressionLevel { + switch cfg.API.Api.CompressionLevel { case NoCompression: case DefaultCompression: r.Use(gzip.Gzip(gzip.DefaultCompression)) @@ -103,7 +103,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co r.Use(prommetricsGin(), gin.RecoveryWithWriter(gin.DefaultErrorWriter, mw.GinPanicErr), mw.CorsHandler(), mw.GinParseOperationID(), GinParseToken(rpcli.NewAuthClient(authConn))) j := jssdk.NewJSSdkApi() - u := NewUserApi(user.NewUserClient(userConn), client, config.Discovery.RpcService) + u := NewUserApi(user.NewUserClient(userConn), client, cfg.Discovery.RpcService) { userRouterGroup := r.Group("/user") userRouterGroup.POST("/user_register", u.UserRegister) @@ -204,7 +204,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co } // Third service { - t := NewThirdApi(third.NewThirdClient(thirdConn), config.API.Prometheus.GrafanaURL) + t := NewThirdApi(third.NewThirdClient(thirdConn), cfg.API.Prometheus.GrafanaURL) thirdGroup := r.Group("/third") thirdGroup.GET("/prometheus", t.GetPrometheus) thirdGroup.POST("/fcm_update_token", t.FcmUpdateToken) @@ -228,7 +228,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co objectGroup.GET("/*name", t.ObjectRedirect) } // Message - m := NewMessageApi(msg.NewMsgClient(msgConn), rpcli.NewUserClient(userConn), config.Share.IMAdminUserID) + m := NewMessageApi(msg.NewMsgClient(msgConn), rpcli.NewUserClient(userConn), cfg.Share.IMAdminUserID) { msgGroup := r.Group("/msg") msgGroup.POST("/newest_seq", m.GetSeq) @@ -285,7 +285,7 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co jssdk.POST("/get_active_conversations", j.GetActiveConversations) } { - pd := NewPrometheusDiscoveryApi(config, client) + pd := NewPrometheusDiscoveryApi(cfg, client) proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) proDiscoveryGroup.GET("/api", pd.Api) proDiscoveryGroup.GET("/user", pd.User) @@ -300,6 +300,22 @@ func newGinRouter(ctx context.Context, client discovery.SvcDiscoveryRegistry, co proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) } + var etcdClient *clientv3.Client + if cfg.Discovery.Enable == config.ETCD { + etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + cm := NewConfigManager(cfg.Share.IMAdminUserID, cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv) + { + + configGroup := r.Group("/config", cm.CheckAdmin) + configGroup.POST("/get_config_list", cm.GetConfigList) + configGroup.POST("/get_config", cm.GetConfig) + configGroup.POST("/set_config", cm.SetConfig) + configGroup.POST("/reset_config", cm.ResetConfig) + } + { + r.POST("/restart", cm.Restart) + } return r, nil } diff --git a/internal/msggateway/hub_server.go b/internal/msggateway/hub_server.go index 7533267260..52afe495b6 100644 --- a/internal/msggateway/hub_server.go +++ b/internal/msggateway/hub_server.go @@ -57,6 +57,13 @@ func (s *Server) Start(ctx context.Context, index int, conf *Config) error { conf.Discovery.RpcService.MessageGateway, nil, conf, + []string{ + conf.Share.GetConfigFileName(), + conf.Discovery.GetConfigFileName(), + conf.MsgGateway.GetConfigFileName(), + conf.WebhooksConfig.GetConfigFileName(), + conf.RedisConfig.GetConfigFileName(), + }, s.InitServer, ) } diff --git a/internal/msggateway/ws_server.go b/internal/msggateway/ws_server.go index 44b6ddb89f..24dd823f6f 100644 --- a/internal/msggateway/ws_server.go +++ b/internal/msggateway/ws_server.go @@ -2,13 +2,16 @@ package msggateway import ( "context" + "errors" "fmt" - "github.com/openimsdk/open-im-server/v3/pkg/rpcli" "net/http" "sync" "sync/atomic" "time" + "github.com/openimsdk/open-im-server/v3/pkg/rpcli" + + "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/pkg/common/webhook" "github.com/openimsdk/open-im-server/v3/pkg/rpccache" pbAuth "github.com/openimsdk/protocol/auth" @@ -182,21 +185,28 @@ func (ws *WsServer) Run(done chan error) error { go func() { http.HandleFunc("/", ws.wsHandler) err := server.ListenAndServe() - defer close(netDone) - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "ws start err", server.Addr) + netDone <- struct{}{} } }() ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - defer cancel() - var err error - select { - case err = <-done: + shutDown := func() error { sErr := server.Shutdown(ctx) if sErr != nil { return errs.WrapMsg(sErr, "shutdown err") } close(shutdownDone) + return nil + } + etcd.RegisterShutDown(shutDown) + defer cancel() + var err error + select { + case err = <-done: + if err := shutDown(); err != nil { + return err + } if err != nil { return err } diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index 5cb6131238..b32732b60d 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -25,6 +25,7 @@ import ( "strconv" "syscall" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/jsonutil" @@ -39,7 +40,7 @@ import ( "github.com/openimsdk/tools/utils/runtimeenv" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" - discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" @@ -92,6 +93,21 @@ func Start(ctx context.Context, index int, config *Config) error { } client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + + if config.Discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), []string{ + config.MsgTransfer.GetConfigFileName(), + config.RedisConfig.GetConfigFileName(), + config.MongodbConfig.GetConfigFileName(), + config.KafkaConfig.GetConfigFileName(), + config.Share.GetConfigFileName(), + config.WebhooksConfig.GetConfigFileName(), + config.Discovery.GetConfigFileName(), + conf.LogConfigFileName, + }) + cm.Watch(ctx) + } + msgModel := redis.NewMsgCache(rdb) msgDocModel, err := mgo.NewMsgMongo(mgocli.GetDB()) if err != nil { @@ -125,6 +141,7 @@ func Start(ctx context.Context, index int, config *Config) error { historyMongoCH: historyMongoCH, runTimeEnv: runTimeEnv, } + return msgTransfer.Start(index, config, client) } diff --git a/internal/tools/cron_task.go b/internal/tools/cron_task.go index 049a6199cf..71fd886f6e 100644 --- a/internal/tools/cron_task.go +++ b/internal/tools/cron_task.go @@ -3,7 +3,7 @@ package tools import ( "context" "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" diff --git a/internal/tools/cron_test.go b/internal/tools/cron_test.go index 8903490698..b4082a5a54 100644 --- a/internal/tools/cron_test.go +++ b/internal/tools/cron_test.go @@ -2,8 +2,10 @@ package tools import ( "context" + "testing" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" pbconversation "github.com/openimsdk/protocol/conversation" "github.com/openimsdk/protocol/msg" "github.com/openimsdk/protocol/third" @@ -12,7 +14,6 @@ import ( "github.com/robfig/cron/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" - "testing" ) func TestName(t *testing.T) { diff --git a/pkg/apistruct/config_manager.go b/pkg/apistruct/config_manager.go new file mode 100644 index 0000000000..84b8fb36bf --- /dev/null +++ b/pkg/apistruct/config_manager.go @@ -0,0 +1,16 @@ +package apistruct + +type GetConfigReq struct { + ConfigName string `json:"configName"` +} + +type GetConfigListResp struct { + Environment string `json:"environment"` + Version string `json:"version"` + ConfigNames []string `json:"configNames"` +} + +type SetConfigReq struct { + ConfigName string `json:"configName"` + Data string `json:"data"` +} diff --git a/pkg/common/cmd/api.go b/pkg/common/cmd/api.go index 4088ecd09d..050b313ffc 100644 --- a/pkg/common/cmd/api.go +++ b/pkg/common/cmd/api.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/api" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" @@ -31,16 +32,36 @@ type ApiCmd struct { } func NewApiCmd() *ApiCmd { - var apiConfig api.Config + apiConfig := api.Config{AllConfig: &config.AllConfig{}} ret := &ApiCmd{apiConfig: &apiConfig} ret.configMap = map[string]any{ - OpenIMAPICfgFileName: &apiConfig.API, - ShareFileName: &apiConfig.Share, - DiscoveryConfigFilename: &apiConfig.Discovery, + config.DiscoveryConfigFilename: &apiConfig.Discovery, + config.KafkaConfigFileName: &apiConfig.Kafka, + config.LocalCacheConfigFileName: &apiConfig.LocalCache, + config.LogConfigFileName: &apiConfig.Log, + config.MinioConfigFileName: &apiConfig.Minio, + config.MongodbConfigFileName: &apiConfig.Mongo, + config.NotificationFileName: &apiConfig.Notification, + config.OpenIMAPICfgFileName: &apiConfig.API, + config.OpenIMCronTaskCfgFileName: &apiConfig.CronTask, + config.OpenIMMsgGatewayCfgFileName: &apiConfig.MsgGateway, + config.OpenIMMsgTransferCfgFileName: &apiConfig.MsgTransfer, + config.OpenIMPushCfgFileName: &apiConfig.Push, + config.OpenIMRPCAuthCfgFileName: &apiConfig.Auth, + config.OpenIMRPCConversationCfgFileName: &apiConfig.Conversation, + config.OpenIMRPCFriendCfgFileName: &apiConfig.Friend, + config.OpenIMRPCGroupCfgFileName: &apiConfig.Group, + config.OpenIMRPCMsgCfgFileName: &apiConfig.Msg, + config.OpenIMRPCThirdCfgFileName: &apiConfig.Third, + config.OpenIMRPCUserCfgFileName: &apiConfig.User, + config.RedisConfigFileName: &apiConfig.Redis, + config.ShareFileName: &apiConfig.Share, + config.WebhooksConfigFileName: &apiConfig.Webhooks, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) ret.Command.RunE = func(cmd *cobra.Command, args []string) error { + apiConfig.ConfigPath = ret.configPath return ret.runE() } return ret diff --git a/pkg/common/cmd/auth.go b/pkg/common/cmd/auth.go index bf51a2cf3c..a5ab3fea7a 100644 --- a/pkg/common/cmd/auth.go +++ b/pkg/common/cmd/auth.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/auth" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,10 +36,10 @@ func NewAuthRpcCmd() *AuthRpcCmd { var authConfig auth.Config ret := &AuthRpcCmd{authConfig: &authConfig} ret.configMap = map[string]any{ - OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, - RedisConfigFileName: &authConfig.RedisConfig, - ShareFileName: &authConfig.Share, - DiscoveryConfigFilename: &authConfig.Discovery, + config.OpenIMRPCAuthCfgFileName: &authConfig.RpcConfig, + config.RedisConfigFileName: &authConfig.RedisConfig, + config.ShareFileName: &authConfig.Share, + config.DiscoveryConfigFilename: &authConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -56,5 +57,12 @@ func (a *AuthRpcCmd) Exec() error { func (a *AuthRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.authConfig.Discovery, &a.authConfig.RpcConfig.Prometheus, a.authConfig.RpcConfig.RPC.ListenIP, a.authConfig.RpcConfig.RPC.RegisterIP, a.authConfig.RpcConfig.RPC.AutoSetPorts, a.authConfig.RpcConfig.RPC.Ports, - a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, auth.Start) + a.Index(), a.authConfig.Discovery.RpcService.Auth, nil, a.authConfig, + []string{ + a.authConfig.RpcConfig.GetConfigFileName(), + a.authConfig.Share.GetConfigFileName(), + a.authConfig.RedisConfig.GetConfigFileName(), + a.authConfig.Discovery.GetConfigFileName(), + }, + auth.Start) } diff --git a/pkg/common/cmd/constant.go b/pkg/common/cmd/constant.go deleted file mode 100644 index 45dbcafda2..0000000000 --- a/pkg/common/cmd/constant.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cmd - -import ( - "strings" -) - -var ( - FileName string - NotificationFileName string - ShareFileName string - WebhooksConfigFileName string - LocalCacheConfigFileName string - KafkaConfigFileName string - RedisConfigFileName string - MongodbConfigFileName string - MinioConfigFileName string - LogConfigFileName string - OpenIMAPICfgFileName string - OpenIMCronTaskCfgFileName string - OpenIMMsgGatewayCfgFileName string - OpenIMMsgTransferCfgFileName string - OpenIMPushCfgFileName string - OpenIMRPCAuthCfgFileName string - OpenIMRPCConversationCfgFileName string - OpenIMRPCFriendCfgFileName string - OpenIMRPCGroupCfgFileName string - OpenIMRPCMsgCfgFileName string - OpenIMRPCThirdCfgFileName string - OpenIMRPCUserCfgFileName string - DiscoveryConfigFilename string -) - -var ConfigEnvPrefixMap map[string]string - -func init() { - FileName = "config.yaml" - NotificationFileName = "notification.yml" - ShareFileName = "share.yml" - WebhooksConfigFileName = "webhooks.yml" - LocalCacheConfigFileName = "local-cache.yml" - KafkaConfigFileName = "kafka.yml" - RedisConfigFileName = "redis.yml" - MongodbConfigFileName = "mongodb.yml" - MinioConfigFileName = "minio.yml" - LogConfigFileName = "log.yml" - OpenIMAPICfgFileName = "openim-api.yml" - OpenIMCronTaskCfgFileName = "openim-crontask.yml" - OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml" - OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml" - OpenIMPushCfgFileName = "openim-push.yml" - OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml" - OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml" - OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml" - OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml" - OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" - OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" - OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" - DiscoveryConfigFilename = "discovery.yml" - - ConfigEnvPrefixMap = make(map[string]string) - fileNames := []string{ - FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName, - KafkaConfigFileName, RedisConfigFileName, - MongodbConfigFileName, MinioConfigFileName, LogConfigFileName, - OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName, - OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName, - OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName, - OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename, - } - - for _, fileName := range fileNames { - envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml") - envKey = "IMENV_" + envKey - envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_")) - ConfigEnvPrefixMap[fileName] = envKey - } -} - -const ( - FlagConf = "config_folder_path" - FlagTransferIndex = "index" -) diff --git a/pkg/common/cmd/conversation.go b/pkg/common/cmd/conversation.go index 4d38f7fd47..12c29a8731 100644 --- a/pkg/common/cmd/conversation.go +++ b/pkg/common/cmd/conversation.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/conversation" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,13 +36,13 @@ func NewConversationRpcCmd() *ConversationRpcCmd { var conversationConfig conversation.Config ret := &ConversationRpcCmd{conversationConfig: &conversationConfig} ret.configMap = map[string]any{ - OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig, - RedisConfigFileName: &conversationConfig.RedisConfig, - MongodbConfigFileName: &conversationConfig.MongodbConfig, - ShareFileName: &conversationConfig.Share, - NotificationFileName: &conversationConfig.NotificationConfig, - LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, - DiscoveryConfigFilename: &conversationConfig.Discovery, + config.OpenIMRPCConversationCfgFileName: &conversationConfig.RpcConfig, + config.RedisConfigFileName: &conversationConfig.RedisConfig, + config.MongodbConfigFileName: &conversationConfig.MongodbConfig, + config.ShareFileName: &conversationConfig.Share, + config.NotificationFileName: &conversationConfig.NotificationConfig, + config.LocalCacheConfigFileName: &conversationConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &conversationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -58,5 +59,15 @@ func (a *ConversationRpcCmd) Exec() error { func (a *ConversationRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.conversationConfig.Discovery, &a.conversationConfig.RpcConfig.Prometheus, a.conversationConfig.RpcConfig.RPC.ListenIP, a.conversationConfig.RpcConfig.RPC.RegisterIP, a.conversationConfig.RpcConfig.RPC.AutoSetPorts, a.conversationConfig.RpcConfig.RPC.Ports, - a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, conversation.Start) + a.Index(), a.conversationConfig.Discovery.RpcService.Conversation, &a.conversationConfig.NotificationConfig, a.conversationConfig, + []string{ + a.conversationConfig.RpcConfig.GetConfigFileName(), + a.conversationConfig.RedisConfig.GetConfigFileName(), + a.conversationConfig.MongodbConfig.GetConfigFileName(), + a.conversationConfig.NotificationConfig.GetConfigFileName(), + a.conversationConfig.Share.GetConfigFileName(), + a.conversationConfig.LocalCacheConfig.GetConfigFileName(), + a.conversationConfig.Discovery.GetConfigFileName(), + }, + conversation.Start) } diff --git a/pkg/common/cmd/cron_task.go b/pkg/common/cmd/cron_task.go index d6c5e472e1..e7eb0ce18e 100644 --- a/pkg/common/cmd/cron_task.go +++ b/pkg/common/cmd/cron_task.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/tools" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" @@ -34,9 +35,9 @@ func NewCronTaskCmd() *CronTaskCmd { var cronTaskConfig tools.CronTaskConfig ret := &CronTaskCmd{cronTaskConfig: &cronTaskConfig} ret.configMap = map[string]any{ - OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, - ShareFileName: &cronTaskConfig.Share, - DiscoveryConfigFilename: &cronTaskConfig.Discovery, + config.OpenIMCronTaskCfgFileName: &cronTaskConfig.CronTask, + config.ShareFileName: &cronTaskConfig.Share, + config.DiscoveryConfigFilename: &cronTaskConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) diff --git a/pkg/common/cmd/friend.go b/pkg/common/cmd/friend.go index 8001165318..209d481bbe 100644 --- a/pkg/common/cmd/friend.go +++ b/pkg/common/cmd/friend.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/relation" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,14 +36,14 @@ func NewFriendRpcCmd() *FriendRpcCmd { var relationConfig relation.Config ret := &FriendRpcCmd{relationConfig: &relationConfig} ret.configMap = map[string]any{ - OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig, - RedisConfigFileName: &relationConfig.RedisConfig, - MongodbConfigFileName: &relationConfig.MongodbConfig, - ShareFileName: &relationConfig.Share, - NotificationFileName: &relationConfig.NotificationConfig, - WebhooksConfigFileName: &relationConfig.WebhooksConfig, - LocalCacheConfigFileName: &relationConfig.LocalCacheConfig, - DiscoveryConfigFilename: &relationConfig.Discovery, + config.OpenIMRPCFriendCfgFileName: &relationConfig.RpcConfig, + config.RedisConfigFileName: &relationConfig.RedisConfig, + config.MongodbConfigFileName: &relationConfig.MongodbConfig, + config.ShareFileName: &relationConfig.Share, + config.NotificationFileName: &relationConfig.NotificationConfig, + config.WebhooksConfigFileName: &relationConfig.WebhooksConfig, + config.LocalCacheConfigFileName: &relationConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &relationConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -59,5 +60,16 @@ func (a *FriendRpcCmd) Exec() error { func (a *FriendRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.relationConfig.Discovery, &a.relationConfig.RpcConfig.Prometheus, a.relationConfig.RpcConfig.RPC.ListenIP, a.relationConfig.RpcConfig.RPC.RegisterIP, a.relationConfig.RpcConfig.RPC.AutoSetPorts, a.relationConfig.RpcConfig.RPC.Ports, - a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, relation.Start) + a.Index(), a.relationConfig.Discovery.RpcService.Friend, &a.relationConfig.NotificationConfig, a.relationConfig, + []string{ + a.relationConfig.RpcConfig.GetConfigFileName(), + a.relationConfig.RedisConfig.GetConfigFileName(), + a.relationConfig.MongodbConfig.GetConfigFileName(), + a.relationConfig.NotificationConfig.GetConfigFileName(), + a.relationConfig.Share.GetConfigFileName(), + a.relationConfig.WebhooksConfig.GetConfigFileName(), + a.relationConfig.LocalCacheConfig.GetConfigFileName(), + a.relationConfig.Discovery.GetConfigFileName(), + }, + relation.Start) } diff --git a/pkg/common/cmd/group.go b/pkg/common/cmd/group.go index 4f8d17516a..23fd460f71 100644 --- a/pkg/common/cmd/group.go +++ b/pkg/common/cmd/group.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/group" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/versionctx" "github.com/openimsdk/open-im-server/v3/version" @@ -36,14 +37,14 @@ func NewGroupRpcCmd() *GroupRpcCmd { var groupConfig group.Config ret := &GroupRpcCmd{groupConfig: &groupConfig} ret.configMap = map[string]any{ - OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig, - RedisConfigFileName: &groupConfig.RedisConfig, - MongodbConfigFileName: &groupConfig.MongodbConfig, - ShareFileName: &groupConfig.Share, - NotificationFileName: &groupConfig.NotificationConfig, - WebhooksConfigFileName: &groupConfig.WebhooksConfig, - LocalCacheConfigFileName: &groupConfig.LocalCacheConfig, - DiscoveryConfigFilename: &groupConfig.Discovery, + config.OpenIMRPCGroupCfgFileName: &groupConfig.RpcConfig, + config.RedisConfigFileName: &groupConfig.RedisConfig, + config.MongodbConfigFileName: &groupConfig.MongodbConfig, + config.ShareFileName: &groupConfig.Share, + config.NotificationFileName: &groupConfig.NotificationConfig, + config.WebhooksConfigFileName: &groupConfig.WebhooksConfig, + config.LocalCacheConfigFileName: &groupConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &groupConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -60,5 +61,16 @@ func (a *GroupRpcCmd) Exec() error { func (a *GroupRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.groupConfig.Discovery, &a.groupConfig.RpcConfig.Prometheus, a.groupConfig.RpcConfig.RPC.ListenIP, a.groupConfig.RpcConfig.RPC.RegisterIP, a.groupConfig.RpcConfig.RPC.AutoSetPorts, a.groupConfig.RpcConfig.RPC.Ports, - a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, group.Start, versionctx.EnableVersionCtx()) + a.Index(), a.groupConfig.Discovery.RpcService.Group, &a.groupConfig.NotificationConfig, a.groupConfig, + []string{ + a.groupConfig.RpcConfig.GetConfigFileName(), + a.groupConfig.RedisConfig.GetConfigFileName(), + a.groupConfig.MongodbConfig.GetConfigFileName(), + a.groupConfig.NotificationConfig.GetConfigFileName(), + a.groupConfig.Share.GetConfigFileName(), + a.groupConfig.WebhooksConfig.GetConfigFileName(), + a.groupConfig.LocalCacheConfig.GetConfigFileName(), + a.groupConfig.Discovery.GetConfigFileName(), + }, + group.Start, versionctx.EnableVersionCtx()) } diff --git a/pkg/common/cmd/msg.go b/pkg/common/cmd/msg.go index b6647f9c10..b6f0b6131b 100644 --- a/pkg/common/cmd/msg.go +++ b/pkg/common/cmd/msg.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/msg" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,15 +36,15 @@ func NewMsgRpcCmd() *MsgRpcCmd { var msgConfig msg.Config ret := &MsgRpcCmd{msgConfig: &msgConfig} ret.configMap = map[string]any{ - OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig, - RedisConfigFileName: &msgConfig.RedisConfig, - MongodbConfigFileName: &msgConfig.MongodbConfig, - KafkaConfigFileName: &msgConfig.KafkaConfig, - ShareFileName: &msgConfig.Share, - NotificationFileName: &msgConfig.NotificationConfig, - WebhooksConfigFileName: &msgConfig.WebhooksConfig, - LocalCacheConfigFileName: &msgConfig.LocalCacheConfig, - DiscoveryConfigFilename: &msgConfig.Discovery, + config.OpenIMRPCMsgCfgFileName: &msgConfig.RpcConfig, + config.RedisConfigFileName: &msgConfig.RedisConfig, + config.MongodbConfigFileName: &msgConfig.MongodbConfig, + config.KafkaConfigFileName: &msgConfig.KafkaConfig, + config.ShareFileName: &msgConfig.Share, + config.NotificationFileName: &msgConfig.NotificationConfig, + config.WebhooksConfigFileName: &msgConfig.WebhooksConfig, + config.LocalCacheConfigFileName: &msgConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &msgConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -60,5 +61,17 @@ func (a *MsgRpcCmd) Exec() error { func (a *MsgRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.msgConfig.Discovery, &a.msgConfig.RpcConfig.Prometheus, a.msgConfig.RpcConfig.RPC.ListenIP, a.msgConfig.RpcConfig.RPC.RegisterIP, a.msgConfig.RpcConfig.RPC.AutoSetPorts, a.msgConfig.RpcConfig.RPC.Ports, - a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, msg.Start) + a.Index(), a.msgConfig.Discovery.RpcService.Msg, &a.msgConfig.NotificationConfig, a.msgConfig, + []string{ + a.msgConfig.RpcConfig.GetConfigFileName(), + a.msgConfig.RedisConfig.GetConfigFileName(), + a.msgConfig.MongodbConfig.GetConfigFileName(), + a.msgConfig.KafkaConfig.GetConfigFileName(), + a.msgConfig.NotificationConfig.GetConfigFileName(), + a.msgConfig.Share.GetConfigFileName(), + a.msgConfig.WebhooksConfig.GetConfigFileName(), + a.msgConfig.LocalCacheConfig.GetConfigFileName(), + a.msgConfig.Discovery.GetConfigFileName(), + }, + msg.Start) } diff --git a/pkg/common/cmd/msg_gateway.go b/pkg/common/cmd/msg_gateway.go index 6363bfbf9e..3f66ea7207 100644 --- a/pkg/common/cmd/msg_gateway.go +++ b/pkg/common/cmd/msg_gateway.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/msggateway" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,11 +36,11 @@ func NewMsgGatewayCmd() *MsgGatewayCmd { var msgGatewayConfig msggateway.Config ret := &MsgGatewayCmd{msgGatewayConfig: &msgGatewayConfig} ret.configMap = map[string]any{ - OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, - ShareFileName: &msgGatewayConfig.Share, - RedisConfigFileName: &msgGatewayConfig.RedisConfig, - WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, - DiscoveryConfigFilename: &msgGatewayConfig.Discovery, + config.OpenIMMsgGatewayCfgFileName: &msgGatewayConfig.MsgGateway, + config.ShareFileName: &msgGatewayConfig.Share, + config.RedisConfigFileName: &msgGatewayConfig.RedisConfig, + config.WebhooksConfigFileName: &msgGatewayConfig.WebhooksConfig, + config.DiscoveryConfigFilename: &msgGatewayConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) diff --git a/pkg/common/cmd/msg_transfer.go b/pkg/common/cmd/msg_transfer.go index 3643934135..fbb83c65f6 100644 --- a/pkg/common/cmd/msg_transfer.go +++ b/pkg/common/cmd/msg_transfer.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/msgtransfer" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" "github.com/spf13/cobra" @@ -34,13 +35,13 @@ func NewMsgTransferCmd() *MsgTransferCmd { var msgTransferConfig msgtransfer.Config ret := &MsgTransferCmd{msgTransferConfig: &msgTransferConfig} ret.configMap = map[string]any{ - OpenIMMsgTransferCfgFileName: &msgTransferConfig.MsgTransfer, - RedisConfigFileName: &msgTransferConfig.RedisConfig, - MongodbConfigFileName: &msgTransferConfig.MongodbConfig, - KafkaConfigFileName: &msgTransferConfig.KafkaConfig, - ShareFileName: &msgTransferConfig.Share, - WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig, - DiscoveryConfigFilename: &msgTransferConfig.Discovery, + config.OpenIMMsgTransferCfgFileName: &msgTransferConfig.MsgTransfer, + config.RedisConfigFileName: &msgTransferConfig.RedisConfig, + config.MongodbConfigFileName: &msgTransferConfig.MongodbConfig, + config.KafkaConfigFileName: &msgTransferConfig.KafkaConfig, + config.ShareFileName: &msgTransferConfig.Share, + config.WebhooksConfigFileName: &msgTransferConfig.WebhooksConfig, + config.DiscoveryConfigFilename: &msgTransferConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) diff --git a/pkg/common/cmd/msg_utils.go b/pkg/common/cmd/msg_utils.go index a0a9b04101..f0e590e2ca 100644 --- a/pkg/common/cmd/msg_utils.go +++ b/pkg/common/cmd/msg_utils.go @@ -15,6 +15,7 @@ package cmd import ( + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/spf13/cobra" ) @@ -26,11 +27,11 @@ func (m *MsgUtilsCmd) AddUserIDFlag() { m.Command.PersistentFlags().StringP("userID", "u", "", "openIM userID") } func (m *MsgUtilsCmd) AddIndexFlag() { - m.Command.PersistentFlags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number") + m.Command.PersistentFlags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number") } func (m *MsgUtilsCmd) AddConfigDirFlag() { - m.Command.PersistentFlags().StringP(FlagConf, "c", "", "path of config directory") + m.Command.PersistentFlags().StringP(config.FlagConf, "c", "", "path of config directory") } diff --git a/pkg/common/cmd/push.go b/pkg/common/cmd/push.go index 93b502cd84..41b9d56e67 100644 --- a/pkg/common/cmd/push.go +++ b/pkg/common/cmd/push.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/push" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,14 +36,14 @@ func NewPushRpcCmd() *PushRpcCmd { var pushConfig push.Config ret := &PushRpcCmd{pushConfig: &pushConfig} ret.configMap = map[string]any{ - OpenIMPushCfgFileName: &pushConfig.RpcConfig, - RedisConfigFileName: &pushConfig.RedisConfig, - KafkaConfigFileName: &pushConfig.KafkaConfig, - ShareFileName: &pushConfig.Share, - NotificationFileName: &pushConfig.NotificationConfig, - WebhooksConfigFileName: &pushConfig.WebhooksConfig, - LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, - DiscoveryConfigFilename: &pushConfig.Discovery, + config.OpenIMPushCfgFileName: &pushConfig.RpcConfig, + config.RedisConfigFileName: &pushConfig.RedisConfig, + config.KafkaConfigFileName: &pushConfig.KafkaConfig, + config.ShareFileName: &pushConfig.Share, + config.NotificationFileName: &pushConfig.NotificationConfig, + config.WebhooksConfigFileName: &pushConfig.WebhooksConfig, + config.LocalCacheConfigFileName: &pushConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &pushConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -60,5 +61,16 @@ func (a *PushRpcCmd) Exec() error { func (a *PushRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.pushConfig.Discovery, &a.pushConfig.RpcConfig.Prometheus, a.pushConfig.RpcConfig.RPC.ListenIP, a.pushConfig.RpcConfig.RPC.RegisterIP, a.pushConfig.RpcConfig.RPC.AutoSetPorts, a.pushConfig.RpcConfig.RPC.Ports, - a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, push.Start) + a.Index(), a.pushConfig.Discovery.RpcService.Push, &a.pushConfig.NotificationConfig, a.pushConfig, + []string{ + a.pushConfig.RpcConfig.GetConfigFileName(), + a.pushConfig.RedisConfig.GetConfigFileName(), + a.pushConfig.KafkaConfig.GetConfigFileName(), + a.pushConfig.NotificationConfig.GetConfigFileName(), + a.pushConfig.Share.GetConfigFileName(), + a.pushConfig.WebhooksConfig.GetConfigFileName(), + a.pushConfig.LocalCacheConfig.GetConfigFileName(), + a.pushConfig.Discovery.GetConfigFileName(), + }, + push.Start) } diff --git a/pkg/common/cmd/root.go b/pkg/common/cmd/root.go index 87252c133a..20db4126ef 100644 --- a/pkg/common/cmd/root.go +++ b/pkg/common/cmd/root.go @@ -1,28 +1,20 @@ -// Copyright © 2023 OpenIM. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - package cmd import ( + "context" + "encoding/json" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/open-im-server/v3/version" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/errs" "github.com/openimsdk/tools/log" "github.com/openimsdk/tools/utils/runtimeenv" "github.com/spf13/cobra" + clientv3 "go.etcd.io/etcd/client/v3" ) type RootCmd struct { @@ -33,6 +25,7 @@ type RootCmd struct { log config.Log index int configPath string + etcdClient *clientv3.Client } func (r *RootCmd) ConfigPath() string { @@ -80,19 +73,43 @@ func NewRootCmd(processName string, opts ...func(*CmdOpts)) *RootCmd { SilenceUsage: true, SilenceErrors: false, } - cmd.Flags().StringP(FlagConf, "c", "", "path of config directory") - cmd.Flags().IntP(FlagTransferIndex, "i", 0, "process startup sequence number") + cmd.Flags().StringP(config.FlagConf, "c", "", "path of config directory") + cmd.Flags().IntP(config.FlagTransferIndex, "i", 0, "process startup sequence number") rootCmd.Command = cmd return rootCmd } +func (r *RootCmd) initEtcd() error { + configDirectory, _, err := r.getFlag(&r.Command) + if err != nil { + return err + } + disConfig := config.Discovery{} + env := runtimeenv.PrintRuntimeEnvironment() + err = config.Load(configDirectory, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], + env, &disConfig) + if err != nil { + return err + } + if disConfig.Enable == config.ETCD { + discov, _ := kdisc.NewDiscoveryRegister(&disConfig, env) + r.etcdClient = discov.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return nil +} + func (r *RootCmd) persistentPreRun(cmd *cobra.Command, opts ...func(*CmdOpts)) error { + if err := r.initEtcd(); err != nil { + return err + } cmdOpts := r.applyOptions(opts...) if err := r.initializeConfiguration(cmd, cmdOpts); err != nil { return err } - + if err := r.updateConfigFromEtcd(cmdOpts); err != nil { + return err + } if err := r.initializeLogger(cmdOpts); err != nil { return errs.WrapMsg(err, "failed to initialize logger") } @@ -111,13 +128,43 @@ func (r *RootCmd) initializeConfiguration(cmd *cobra.Command, opts *CmdOpts) err // Load common configuration file //opts.configMap[ShareFileName] = StructEnvPrefix{EnvPrefix: shareEnvPrefix, ConfigStruct: &r.share} for configFileName, configStruct := range opts.configMap { - err := config.Load(configDirectory, configFileName, ConfigEnvPrefixMap[configFileName], runtimeEnv, configStruct) + err := config.Load(configDirectory, configFileName, config.EnvPrefixMap[configFileName], runtimeEnv, configStruct) if err != nil { return err } } // Load common log configuration file - return config.Load(configDirectory, LogConfigFileName, ConfigEnvPrefixMap[LogConfigFileName], runtimeEnv, &r.log) + return config.Load(configDirectory, config.LogConfigFileName, config.EnvPrefixMap[config.LogConfigFileName], runtimeEnv, &r.log) +} + +func (r *RootCmd) updateConfigFromEtcd(opts *CmdOpts) error { + if r.etcdClient == nil { + return nil + } + + update := func(configFileName string, configStruct any) error { + key := disetcd.BuildKey(configFileName) + etcdRes, err := r.etcdClient.Get(context.TODO(), key) + if err != nil || etcdRes.Count == 0 { + return nil + } + err = json.Unmarshal(etcdRes.Kvs[0].Value, configStruct) + if err != nil { + return errs.WrapMsg(err, "failed to unmarshal config from etcd") + } + return nil + } + for configFileName, configStruct := range opts.configMap { + if err := update(configFileName, configStruct); err != nil { + return err + } + } + if err := update(config.LogConfigFileName, &r.log); err != nil { + return err + } + // Load common log configuration file + return nil + } func (r *RootCmd) applyOptions(opts ...func(*CmdOpts)) *CmdOpts { @@ -158,12 +205,12 @@ func defaultCmdOpts() *CmdOpts { } func (r *RootCmd) getFlag(cmd *cobra.Command) (string, int, error) { - configDirectory, err := cmd.Flags().GetString(FlagConf) + configDirectory, err := cmd.Flags().GetString(config.FlagConf) if err != nil { return "", 0, errs.Wrap(err) } r.configPath = configDirectory - index, err := cmd.Flags().GetInt(FlagTransferIndex) + index, err := cmd.Flags().GetInt(config.FlagTransferIndex) if err != nil { return "", 0, errs.Wrap(err) } diff --git a/pkg/common/cmd/third.go b/pkg/common/cmd/third.go index e43f61732b..5086116b54 100644 --- a/pkg/common/cmd/third.go +++ b/pkg/common/cmd/third.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/third" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,14 +36,14 @@ func NewThirdRpcCmd() *ThirdRpcCmd { var thirdConfig third.Config ret := &ThirdRpcCmd{thirdConfig: &thirdConfig} ret.configMap = map[string]any{ - OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig, - RedisConfigFileName: &thirdConfig.RedisConfig, - MongodbConfigFileName: &thirdConfig.MongodbConfig, - ShareFileName: &thirdConfig.Share, - NotificationFileName: &thirdConfig.NotificationConfig, - MinioConfigFileName: &thirdConfig.MinioConfig, - LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, - DiscoveryConfigFilename: &thirdConfig.Discovery, + config.OpenIMRPCThirdCfgFileName: &thirdConfig.RpcConfig, + config.RedisConfigFileName: &thirdConfig.RedisConfig, + config.MongodbConfigFileName: &thirdConfig.MongodbConfig, + config.ShareFileName: &thirdConfig.Share, + config.NotificationFileName: &thirdConfig.NotificationConfig, + config.MinioConfigFileName: &thirdConfig.MinioConfig, + config.LocalCacheConfigFileName: &thirdConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &thirdConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -59,5 +60,16 @@ func (a *ThirdRpcCmd) Exec() error { func (a *ThirdRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.thirdConfig.Discovery, &a.thirdConfig.RpcConfig.Prometheus, a.thirdConfig.RpcConfig.RPC.ListenIP, a.thirdConfig.RpcConfig.RPC.RegisterIP, a.thirdConfig.RpcConfig.RPC.AutoSetPorts, a.thirdConfig.RpcConfig.RPC.Ports, - a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, third.Start) + a.Index(), a.thirdConfig.Discovery.RpcService.Third, &a.thirdConfig.NotificationConfig, a.thirdConfig, + []string{ + a.thirdConfig.RpcConfig.GetConfigFileName(), + a.thirdConfig.RedisConfig.GetConfigFileName(), + a.thirdConfig.MongodbConfig.GetConfigFileName(), + a.thirdConfig.NotificationConfig.GetConfigFileName(), + a.thirdConfig.Share.GetConfigFileName(), + a.thirdConfig.MinioConfig.GetConfigFileName(), + a.thirdConfig.LocalCacheConfig.GetConfigFileName(), + a.thirdConfig.Discovery.GetConfigFileName(), + }, + third.Start) } diff --git a/pkg/common/cmd/user.go b/pkg/common/cmd/user.go index dc848a7758..61125e0c37 100644 --- a/pkg/common/cmd/user.go +++ b/pkg/common/cmd/user.go @@ -18,6 +18,7 @@ import ( "context" "github.com/openimsdk/open-im-server/v3/internal/rpc/user" + "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/startrpc" "github.com/openimsdk/open-im-server/v3/version" "github.com/openimsdk/tools/system/program" @@ -35,15 +36,15 @@ func NewUserRpcCmd() *UserRpcCmd { var userConfig user.Config ret := &UserRpcCmd{userConfig: &userConfig} ret.configMap = map[string]any{ - OpenIMRPCUserCfgFileName: &userConfig.RpcConfig, - RedisConfigFileName: &userConfig.RedisConfig, - MongodbConfigFileName: &userConfig.MongodbConfig, - KafkaConfigFileName: &userConfig.KafkaConfig, - ShareFileName: &userConfig.Share, - NotificationFileName: &userConfig.NotificationConfig, - WebhooksConfigFileName: &userConfig.WebhooksConfig, - LocalCacheConfigFileName: &userConfig.LocalCacheConfig, - DiscoveryConfigFilename: &userConfig.Discovery, + config.OpenIMRPCUserCfgFileName: &userConfig.RpcConfig, + config.RedisConfigFileName: &userConfig.RedisConfig, + config.MongodbConfigFileName: &userConfig.MongodbConfig, + config.KafkaConfigFileName: &userConfig.KafkaConfig, + config.ShareFileName: &userConfig.Share, + config.NotificationFileName: &userConfig.NotificationConfig, + config.WebhooksConfigFileName: &userConfig.WebhooksConfig, + config.LocalCacheConfigFileName: &userConfig.LocalCacheConfig, + config.DiscoveryConfigFilename: &userConfig.Discovery, } ret.RootCmd = NewRootCmd(program.GetProcessName(), WithConfigMap(ret.configMap)) ret.ctx = context.WithValue(context.Background(), "version", version.Version) @@ -60,5 +61,17 @@ func (a *UserRpcCmd) Exec() error { func (a *UserRpcCmd) runE() error { return startrpc.Start(a.ctx, &a.userConfig.Discovery, &a.userConfig.RpcConfig.Prometheus, a.userConfig.RpcConfig.RPC.ListenIP, a.userConfig.RpcConfig.RPC.RegisterIP, a.userConfig.RpcConfig.RPC.AutoSetPorts, a.userConfig.RpcConfig.RPC.Ports, - a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, user.Start) + a.Index(), a.userConfig.Discovery.RpcService.User, &a.userConfig.NotificationConfig, a.userConfig, + []string{ + a.userConfig.RpcConfig.GetConfigFileName(), + a.userConfig.RedisConfig.GetConfigFileName(), + a.userConfig.MongodbConfig.GetConfigFileName(), + a.userConfig.KafkaConfig.GetConfigFileName(), + a.userConfig.NotificationConfig.GetConfigFileName(), + a.userConfig.Share.GetConfigFileName(), + a.userConfig.WebhooksConfig.GetConfigFileName(), + a.userConfig.LocalCacheConfig.GetConfigFileName(), + a.userConfig.Discovery.GetConfigFileName(), + }, + user.Start) } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index da11a20e72..1b9121b7a3 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -679,3 +679,220 @@ func InitNotification(notification *Notification) { notification.ConversationSetPrivate.UnreadCount = false notification.ConversationSetPrivate.ReliabilityLevel = 1 } + +type AllConfig struct { + Discovery Discovery + Kafka Kafka + LocalCache LocalCache + Log Log + Minio Minio + Mongo Mongo + Notification Notification + API API + CronTask CronTask + MsgGateway MsgGateway + MsgTransfer MsgTransfer + Push Push + Auth Auth + Conversation Conversation + Friend Friend + Group Group + Msg Msg + Third Third + User User + Redis Redis + Share Share + Webhooks Webhooks +} + +func (a *AllConfig) Name2Config(name string) any { + switch name { + case a.Discovery.GetConfigFileName(): + return a.Discovery + case a.Kafka.GetConfigFileName(): + return a.Kafka + case a.LocalCache.GetConfigFileName(): + return a.LocalCache + case a.Log.GetConfigFileName(): + return a.Log + case a.Minio.GetConfigFileName(): + return a.Minio + case a.Mongo.GetConfigFileName(): + return a.Mongo + case a.Notification.GetConfigFileName(): + return a.Notification + case a.API.GetConfigFileName(): + return a.API + case a.CronTask.GetConfigFileName(): + return a.CronTask + case a.MsgGateway.GetConfigFileName(): + return a.MsgGateway + case a.MsgTransfer.GetConfigFileName(): + return a.MsgTransfer + case a.Push.GetConfigFileName(): + return a.Push + case a.Auth.GetConfigFileName(): + return a.Auth + case a.Conversation.GetConfigFileName(): + return a.Conversation + case a.Friend.GetConfigFileName(): + return a.Friend + case a.Group.GetConfigFileName(): + return a.Group + case a.Msg.GetConfigFileName(): + return a.Msg + case a.Third.GetConfigFileName(): + return a.Third + case a.User.GetConfigFileName(): + return a.User + case a.Redis.GetConfigFileName(): + return a.Redis + case a.Share.GetConfigFileName(): + return a.Share + case a.Webhooks.GetConfigFileName(): + return a.Webhooks + default: + return nil + } +} + +func (a *AllConfig) GetConfigNames() []string { + return []string{ + a.Discovery.GetConfigFileName(), + a.Kafka.GetConfigFileName(), + a.LocalCache.GetConfigFileName(), + a.Log.GetConfigFileName(), + a.Minio.GetConfigFileName(), + a.Mongo.GetConfigFileName(), + a.Notification.GetConfigFileName(), + a.API.GetConfigFileName(), + a.CronTask.GetConfigFileName(), + a.MsgGateway.GetConfigFileName(), + a.MsgTransfer.GetConfigFileName(), + a.Push.GetConfigFileName(), + a.Auth.GetConfigFileName(), + a.Conversation.GetConfigFileName(), + a.Friend.GetConfigFileName(), + a.Group.GetConfigFileName(), + a.Msg.GetConfigFileName(), + a.Third.GetConfigFileName(), + a.User.GetConfigFileName(), + a.Redis.GetConfigFileName(), + a.Share.GetConfigFileName(), + a.Webhooks.GetConfigFileName(), + } +} + +var ( + FileName = "config.yaml" + DiscoveryConfigFilename = "discovery.yml" + KafkaConfigFileName = "kafka.yml" + LocalCacheConfigFileName = "local-cache.yml" + LogConfigFileName = "log.yml" + MinioConfigFileName = "minio.yml" + MongodbConfigFileName = "mongodb.yml" + NotificationFileName = "notification.yml" + OpenIMAPICfgFileName = "openim-api.yml" + OpenIMCronTaskCfgFileName = "openim-crontask.yml" + OpenIMMsgGatewayCfgFileName = "openim-msggateway.yml" + OpenIMMsgTransferCfgFileName = "openim-msgtransfer.yml" + OpenIMPushCfgFileName = "openim-push.yml" + OpenIMRPCAuthCfgFileName = "openim-rpc-auth.yml" + OpenIMRPCConversationCfgFileName = "openim-rpc-conversation.yml" + OpenIMRPCFriendCfgFileName = "openim-rpc-friend.yml" + OpenIMRPCGroupCfgFileName = "openim-rpc-group.yml" + OpenIMRPCMsgCfgFileName = "openim-rpc-msg.yml" + OpenIMRPCThirdCfgFileName = "openim-rpc-third.yml" + OpenIMRPCUserCfgFileName = "openim-rpc-user.yml" + RedisConfigFileName = "redis.yml" + ShareFileName = "share.yml" + WebhooksConfigFileName = "webhooks.yml" +) + +func (d *Discovery) GetConfigFileName() string { + return DiscoveryConfigFilename +} + +func (k *Kafka) GetConfigFileName() string { + return KafkaConfigFileName +} + +func (lc *LocalCache) GetConfigFileName() string { + return LocalCacheConfigFileName +} + +func (l *Log) GetConfigFileName() string { + return LogConfigFileName +} + +func (m *Minio) GetConfigFileName() string { + return MinioConfigFileName +} + +func (m *Mongo) GetConfigFileName() string { + return MongodbConfigFileName +} + +func (n *Notification) GetConfigFileName() string { + return NotificationFileName +} + +func (a *API) GetConfigFileName() string { + return OpenIMAPICfgFileName +} + +func (ct *CronTask) GetConfigFileName() string { + return OpenIMCronTaskCfgFileName +} + +func (mg *MsgGateway) GetConfigFileName() string { + return OpenIMMsgGatewayCfgFileName +} + +func (mt *MsgTransfer) GetConfigFileName() string { + return OpenIMMsgTransferCfgFileName +} + +func (p *Push) GetConfigFileName() string { + return OpenIMPushCfgFileName +} + +func (a *Auth) GetConfigFileName() string { + return OpenIMRPCAuthCfgFileName +} + +func (c *Conversation) GetConfigFileName() string { + return OpenIMRPCConversationCfgFileName +} + +func (f *Friend) GetConfigFileName() string { + return OpenIMRPCFriendCfgFileName +} + +func (g *Group) GetConfigFileName() string { + return OpenIMRPCGroupCfgFileName +} + +func (m *Msg) GetConfigFileName() string { + return OpenIMRPCMsgCfgFileName +} + +func (t *Third) GetConfigFileName() string { + return OpenIMRPCThirdCfgFileName +} + +func (u *User) GetConfigFileName() string { + return OpenIMRPCUserCfgFileName +} + +func (r *Redis) GetConfigFileName() string { + return RedisConfigFileName +} + +func (s *Share) GetConfigFileName() string { + return ShareFileName +} + +func (w *Webhooks) GetConfigFileName() string { + return WebhooksConfigFileName +} diff --git a/pkg/common/config/env.go b/pkg/common/config/env.go new file mode 100644 index 0000000000..99ccb3ca03 --- /dev/null +++ b/pkg/common/config/env.go @@ -0,0 +1,30 @@ +package config + +import "strings" + +var EnvPrefixMap map[string]string + +func init() { + EnvPrefixMap = make(map[string]string) + fileNames := []string{ + FileName, NotificationFileName, ShareFileName, WebhooksConfigFileName, + KafkaConfigFileName, RedisConfigFileName, + MongodbConfigFileName, MinioConfigFileName, LogConfigFileName, + OpenIMAPICfgFileName, OpenIMCronTaskCfgFileName, OpenIMMsgGatewayCfgFileName, + OpenIMMsgTransferCfgFileName, OpenIMPushCfgFileName, OpenIMRPCAuthCfgFileName, + OpenIMRPCConversationCfgFileName, OpenIMRPCFriendCfgFileName, OpenIMRPCGroupCfgFileName, + OpenIMRPCMsgCfgFileName, OpenIMRPCThirdCfgFileName, OpenIMRPCUserCfgFileName, DiscoveryConfigFilename, + } + + for _, fileName := range fileNames { + envKey := strings.TrimSuffix(strings.TrimSuffix(fileName, ".yml"), ".yaml") + envKey = "IMENV_" + envKey + envKey = strings.ToUpper(strings.ReplaceAll(envKey, "-", "_")) + EnvPrefixMap[fileName] = envKey + } +} + +const ( + FlagConf = "config_folder_path" + FlagTransferIndex = "index" +) diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index e575b27a9d..612d443355 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -23,7 +23,7 @@ func TestLoadMongoConfig(t *testing.T) { // os.Setenv("IMENV_MONGODB_URI", "openIM123") // os.Setenv("IMENV_MONGODB_USERNAME", "openIM123") err := Load("../../../config/", "mongodb.yml", "IMENV_MONGODB", "source", &mongo) - // err := LoadConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo) + // err := LoadApiConfig("../../../config/mongodb.yml", "IMENV_MONGODB", &mongo) assert.Nil(t, err) t.Log(mongo.Password) diff --git a/pkg/common/config/parse.go b/pkg/common/config/parse.go index ea62c1de0c..8fa8812ef2 100644 --- a/pkg/common/config/parse.go +++ b/pkg/common/config/parse.go @@ -26,9 +26,7 @@ import ( ) const ( - FileName = "config.yaml" - NotificationFileName = "notification.yaml" - DefaultFolderPath = "../config/" + DefaultFolderPath = "../config/" ) // return absolude path join ../config/, this is k8s container config path. diff --git a/pkg/common/discoveryregister/direct/direct_resolver.go b/pkg/common/discovery/direct/direct_resolver.go similarity index 100% rename from pkg/common/discoveryregister/direct/direct_resolver.go rename to pkg/common/discovery/direct/direct_resolver.go diff --git a/pkg/common/discoveryregister/direct/directconn.go b/pkg/common/discovery/direct/directconn.go similarity index 100% rename from pkg/common/discoveryregister/direct/directconn.go rename to pkg/common/discovery/direct/directconn.go diff --git a/pkg/common/discoveryregister/direct/doc.go b/pkg/common/discovery/direct/doc.go similarity index 94% rename from pkg/common/discoveryregister/direct/doc.go rename to pkg/common/discovery/direct/doc.go index b3cd0f804b..0ba0d14371 100644 --- a/pkg/common/discoveryregister/direct/doc.go +++ b/pkg/common/discovery/direct/doc.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package direct // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/direct" +package direct // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/direct" diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discovery/discoveryregister.go similarity index 98% rename from pkg/common/discoveryregister/discoveryregister.go rename to pkg/common/discovery/discoveryregister.go index ae4229e1b5..bc9fd0f5af 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discovery/discoveryregister.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discoveryregister +package discovery import ( "time" diff --git a/pkg/common/discoveryregister/discoveryregister_test.go b/pkg/common/discovery/discoveryregister_test.go similarity index 98% rename from pkg/common/discoveryregister/discoveryregister_test.go rename to pkg/common/discovery/discoveryregister_test.go index 4172266451..63f7e94cd5 100644 --- a/pkg/common/discoveryregister/discoveryregister_test.go +++ b/pkg/common/discovery/discoveryregister_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discoveryregister +package discovery import ( "os" diff --git a/pkg/common/discoveryregister/doc.go b/pkg/common/discovery/doc.go similarity index 85% rename from pkg/common/discoveryregister/doc.go rename to pkg/common/discovery/doc.go index 46bbe70012..7a5918cfae 100644 --- a/pkg/common/discoveryregister/doc.go +++ b/pkg/common/discovery/doc.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package discoveryregister // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" +package discovery // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" diff --git a/pkg/common/discovery/etcd/config_manager.go b/pkg/common/discovery/etcd/config_manager.go new file mode 100644 index 0000000000..013e2cce33 --- /dev/null +++ b/pkg/common/discovery/etcd/config_manager.go @@ -0,0 +1,111 @@ +package etcd + +import ( + "context" + "os" + "os/exec" + "runtime" + "sync" + "syscall" + + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + "github.com/openimsdk/tools/utils/datautil" + clientv3 "go.etcd.io/etcd/client/v3" +) + +const ( + ConfigKeyPrefix = "/open-im/config/" + RestartKey = "restart" +) + +var ( + ShutDowns []func() error +) + +func RegisterShutDown(shutDown ...func() error) { + ShutDowns = append(ShutDowns, shutDown...) +} + +type ConfigManager struct { + client *clientv3.Client + watchConfigNames []string + lock sync.Mutex +} + +func BuildKey(s string) string { + return ConfigKeyPrefix + s +} + +func NewConfigManager(client *clientv3.Client, configNames []string) *ConfigManager { + return &ConfigManager{ + client: client, + watchConfigNames: datautil.Batch(func(s string) string { return BuildKey(s) }, append(configNames, RestartKey))} +} + +func (c *ConfigManager) Watch(ctx context.Context) { + chans := make([]clientv3.WatchChan, 0, len(c.watchConfigNames)) + for _, name := range c.watchConfigNames { + chans = append(chans, c.client.Watch(ctx, name, clientv3.WithPrefix())) + } + + doWatch := func(watchChan clientv3.WatchChan) { + for watchResp := range watchChan { + if watchResp.Err() != nil { + log.ZError(ctx, "watch err", errs.Wrap(watchResp.Err())) + continue + } + for _, event := range watchResp.Events { + if event.IsModify() { + if datautil.Contain(string(event.Kv.Key), c.watchConfigNames...) { + c.lock.Lock() + err := restartServer(ctx) + if err != nil { + log.ZError(ctx, "restart server err", err) + } + c.lock.Unlock() + } + } + } + } + } + for _, ch := range chans { + go doWatch(ch) + } +} + +func restartServer(ctx context.Context) error { + exePath, err := os.Executable() + if err != nil { + return errs.New("get executable path fail").Wrap() + } + + args := os.Args + env := os.Environ() + + cmd := exec.Command(exePath, args[1:]...) + cmd.Env = env + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Stdin = os.Stdin + + if runtime.GOOS != "windows" { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + log.ZInfo(ctx, "shutdown server") + for _, f := range ShutDowns { + if err = f(); err != nil { + log.ZError(ctx, "shutdown fail", err) + } + } + + log.ZInfo(ctx, "restart server") + err = cmd.Start() + if err != nil { + return errs.New("restart server fail").Wrap() + } + log.ZInfo(ctx, "cmd start over") + + os.Exit(0) + return nil +} diff --git a/pkg/common/discoveryregister/kubernetes/doc.go b/pkg/common/discovery/etcd/doc.go similarity index 84% rename from pkg/common/discoveryregister/kubernetes/doc.go rename to pkg/common/discovery/etcd/doc.go index 8615caa6b6..fedf5ad511 100644 --- a/pkg/common/discoveryregister/kubernetes/doc.go +++ b/pkg/common/discovery/etcd/doc.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/kubernetes" +package etcd // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" diff --git a/pkg/common/discoveryregister/etcd/doc.go b/pkg/common/discovery/kubernetes/doc.go similarity index 94% rename from pkg/common/discoveryregister/etcd/doc.go rename to pkg/common/discovery/kubernetes/doc.go index 1da7508a16..dc23ac132f 100644 --- a/pkg/common/discoveryregister/etcd/doc.go +++ b/pkg/common/discovery/kubernetes/doc.go @@ -12,4 +12,4 @@ // See the License for the specific language governing permissions and // limitations under the License. -package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister/etcd" +package kubernetes // import "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/kubernetes" diff --git a/pkg/common/discoveryregister/kubernetes/kubernetes.go b/pkg/common/discovery/kubernetes/kubernetes.go similarity index 100% rename from pkg/common/discoveryregister/kubernetes/kubernetes.go rename to pkg/common/discovery/kubernetes/kubernetes.go diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index fd46fb45c4..5facc8f730 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -27,6 +27,7 @@ import ( "time" conf "github.com/openimsdk/open-im-server/v3/pkg/common/config" + disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd" "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" "github.com/openimsdk/tools/utils/jsonutil" @@ -34,7 +35,7 @@ import ( "github.com/openimsdk/tools/utils/runtimeenv" - kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" "github.com/openimsdk/tools/discovery" "github.com/openimsdk/tools/errs" @@ -48,9 +49,11 @@ import ( // Start rpc server. func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConfig *conf.Prometheus, listenIP, registerIP string, autoSetPorts bool, rpcPorts []int, index int, rpcRegisterName string, notification *conf.Notification, config T, + watchConfigNames []string, rpcFn func(ctx context.Context, config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { + watchConfigNames = append(watchConfigNames, conf.LogConfigFileName) var ( rpcTcpAddr string netDone = make(chan struct{}, 2) @@ -185,12 +188,17 @@ func Start[T any](ctx context.Context, discovery *conf.Discovery, prometheusConf go func() { err := srv.Serve(listener) - if err != nil { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "rpc start err: ", rpcTcpAddr) netDone <- struct{}{} } }() + if discovery.Enable == conf.ETCD { + cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), watchConfigNames) + cm.Watch(ctx) + } + sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGTERM) select { diff --git a/tools/check-component/main.go b/tools/check-component/main.go index 15d8640c70..9df0da7de3 100644 --- a/tools/check-component/main.go +++ b/tools/check-component/main.go @@ -24,7 +24,6 @@ import ( "path/filepath" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/tools/db/mongoutil" "github.com/openimsdk/tools/db/redisutil" @@ -87,35 +86,35 @@ func initConfig(configDir string) (*config.Mongo, *config.Redis, *config.Kafka, ) runtimeEnv := runtimeenv.PrintRuntimeEnvironment() - err := config.Load(configDir, cmd.MongodbConfigFileName, cmd.ConfigEnvPrefixMap[cmd.MongodbConfigFileName], runtimeEnv, mongoConfig) + err := config.Load(configDir, config.MongodbConfigFileName, config.EnvPrefixMap[config.MongodbConfigFileName], runtimeEnv, mongoConfig) if err != nil { return nil, nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.RedisConfigFileName, cmd.ConfigEnvPrefixMap[cmd.RedisConfigFileName], runtimeEnv, redisConfig) + err = config.Load(configDir, config.RedisConfigFileName, config.EnvPrefixMap[config.RedisConfigFileName], runtimeEnv, redisConfig) if err != nil { return nil, nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.KafkaConfigFileName, cmd.ConfigEnvPrefixMap[cmd.KafkaConfigFileName], runtimeEnv, kafkaConfig) + err = config.Load(configDir, config.KafkaConfigFileName, config.EnvPrefixMap[config.KafkaConfigFileName], runtimeEnv, kafkaConfig) if err != nil { return nil, nil, nil, nil, nil, err } - err = config.Load(configDir, cmd.OpenIMRPCThirdCfgFileName, cmd.ConfigEnvPrefixMap[cmd.OpenIMRPCThirdCfgFileName], runtimeEnv, thirdConfig) + err = config.Load(configDir, config.OpenIMRPCThirdCfgFileName, config.EnvPrefixMap[config.OpenIMRPCThirdCfgFileName], runtimeEnv, thirdConfig) if err != nil { return nil, nil, nil, nil, nil, err } if thirdConfig.Object.Enable == "minio" { - err = config.Load(configDir, cmd.MinioConfigFileName, cmd.ConfigEnvPrefixMap[cmd.MinioConfigFileName], runtimeEnv, minioConfig) + err = config.Load(configDir, config.MinioConfigFileName, config.EnvPrefixMap[config.MinioConfigFileName], runtimeEnv, minioConfig) if err != nil { return nil, nil, nil, nil, nil, err } } else { minioConfig = nil } - err = config.Load(configDir, cmd.DiscoveryConfigFilename, cmd.ConfigEnvPrefixMap[cmd.DiscoveryConfigFilename], runtimeEnv, discovery) + err = config.Load(configDir, config.DiscoveryConfigFilename, config.EnvPrefixMap[config.DiscoveryConfigFilename], runtimeEnv, discovery) if err != nil { return nil, nil, nil, nil, nil, err } diff --git a/tools/seq/internal/seq.go b/tools/seq/internal/seq.go index 62466670ec..c931cda5d3 100644 --- a/tools/seq/internal/seq.go +++ b/tools/seq/internal/seq.go @@ -15,7 +15,6 @@ import ( "syscall" "time" - "github.com/openimsdk/open-im-server/v3/pkg/common/cmd" "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/database/mgo" "github.com/openimsdk/tools/db/mongoutil" @@ -59,11 +58,11 @@ func readConfig[T any](dir string, name string) (*T, error) { } func Main(conf string, del time.Duration) error { - redisConfig, err := readConfig[config.Redis](conf, cmd.RedisConfigFileName) + redisConfig, err := readConfig[config.Redis](conf, config.RedisConfigFileName) if err != nil { return err } - mongodbConfig, err := readConfig[config.Mongo](conf, cmd.MongodbConfigFileName) + mongodbConfig, err := readConfig[config.Mongo](conf, config.MongodbConfigFileName) if err != nil { return err }