Skip to content

Commit

Permalink
feat: config center (#619)
Browse files Browse the repository at this point in the history
* feat: config center

* feat: config center
  • Loading branch information
icey-yu authored Dec 27, 2024
1 parent 12f642b commit 06353ce
Show file tree
Hide file tree
Showing 15 changed files with 669 additions and 90 deletions.
202 changes: 202 additions & 0 deletions internal/api/admin/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package admin

import (
"encoding/json"
"reflect"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/openimsdk/chat/pkg/common/apistruct"
"github.com/openimsdk/chat/pkg/common/config"
"github.com/openimsdk/chat/pkg/common/kdisc"
"github.com/openimsdk/chat/pkg/common/kdisc/etcd"
"github.com/openimsdk/chat/version"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
clientv3 "go.etcd.io/etcd/client/v3"
)

type ConfigManager struct {
config *config.AllConfig
client *clientv3.Client
configPath string
runtimeEnv string
}

func NewConfigManager(cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
return &ConfigManager{
config: cfg,
client: client,
configPath: configPath,
runtimeEnv: runtimeEnv,
}
}

func (cm *ConfigManager) GetConfig(c *gin.Context) {
var req apistruct.GetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
conf := cm.config.Name2Config(req.ConfigName)
if conf == nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
return
}
b, err := json.Marshal(conf)
if err != nil {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, string(b))
}

func (cm *ConfigManager) GetConfigList(c *gin.Context) {
var resp apistruct.GetConfigListResp
resp.ConfigNames = cm.config.GetConfigNames()
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
resp.Version = version.Version

apiresp.GinSuccess(c, resp)
}

func (cm *ConfigManager) SetConfig(c *gin.Context) {
if cm.config.Discovery.Enable != kdisc.ETCDCONST {
apiresp.GinError(c, errs.New("only etcd support set config").Wrap())
return
}
var req apistruct.SetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
var err error
switch req.ConfigName {
case config.DiscoveryConfigFileName:
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.LogConfigFileName:
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.MongodbConfigFileName:
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatAPIAdminCfgFileName:
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatAPIChatCfgFileName:
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatRPCAdminCfgFileName:
err = compareAndSave[config.Admin](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatRPCChatCfgFileName:
err = compareAndSave[config.Chat](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ShareFileName:
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.RedisConfigFileName:
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
default:
apiresp.GinError(c, errs.ErrArgs.Wrap())
return
}
if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
apiresp.GinSuccess(c, nil)
}

func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
conf := new(T)
err := json.Unmarshal([]byte(req.Data), &conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
eq := reflect.DeepEqual(old, conf)
if eq {
return nil
}
data, err := json.Marshal(conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
if err != nil {
return errs.WrapMsg(err, "save to etcd failed")
}
return nil
}

func (cm *ConfigManager) ResetConfig(c *gin.Context) {
go cm.resetConfig(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) resetConfig(c *gin.Context) {
txn := cm.client.Txn(c)
type initConf struct {
old any
new any
isChanged bool
}
configMap := map[string]*initConf{
config.DiscoveryConfigFileName: {old: &cm.config.Discovery, new: new(config.Discovery)},
config.LogConfigFileName: {old: &cm.config.Log, new: new(config.Log)},
config.MongodbConfigFileName: {old: &cm.config.Mongo, new: new(config.Mongo)},
config.ChatAPIAdminCfgFileName: {old: &cm.config.AdminAPI, new: new(config.API)},
config.ChatAPIChatCfgFileName: {old: &cm.config.ChatAPI, new: new(config.API)},
config.ChatRPCAdminCfgFileName: {old: &cm.config.Admin, new: new(config.Admin)},
config.ChatRPCChatCfgFileName: {old: &cm.config.Chat, new: new(config.Chat)},
config.RedisConfigFileName: {old: &cm.config.Redis, new: new(config.Redis)},
config.ShareFileName: {old: &cm.config.Share, new: new(config.Share)},
}

changedKeys := make([]string, 0, len(configMap))
for k, v := range configMap {
err := config.Load(
cm.configPath,
k,
config.EnvPrefixMap[k],
cm.runtimeEnv,
v.new,
)
if err != nil {
log.ZError(c, "load config failed", err)
continue
}
v.isChanged = reflect.DeepEqual(v.old, v.new)
if !v.isChanged {
changedKeys = append(changedKeys, k)
}
}

ops := make([]clientv3.Op, 0)
for _, k := range changedKeys {
data, err := json.Marshal(configMap[k].new)
if err != nil {
log.ZError(c, "marshal config failed", err)
continue
}
ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data)))
}
if len(ops) > 0 {
txn.Then(ops...)
_, err := txn.Commit()
if err != nil {
log.ZError(c, "commit etcd txn failed", err)
return
}
}
}

func (cm *ConfigManager) Restart(c *gin.Context) {
go cm.restart(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) restart(c *gin.Context) {
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
t := time.Now().Unix()
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
if err != nil {
log.ZError(c, "restart etcd put key failed", err)
}
}
84 changes: 74 additions & 10 deletions internal/api/admin/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,40 @@ package admin

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gin-gonic/gin"
chatmw "github.com/openimsdk/chat/internal/api/mw"
"github.com/openimsdk/chat/internal/api/util"
"github.com/openimsdk/chat/pkg/common/config"
"github.com/openimsdk/chat/pkg/common/imapi"
"github.com/openimsdk/chat/pkg/common/kdisc"
disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd"
adminclient "github.com/openimsdk/chat/pkg/protocol/admin"
chatclient "github.com/openimsdk/chat/pkg/protocol/chat"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/openimsdk/tools/utils/runtimeenv"
)

type Config struct {
ApiConfig config.API

Discovery config.Discovery
Share config.Share
*config.AllConfig

RuntimeEnv string
ConfigPath string
}

func Start(ctx context.Context, index int, config *Config) error {
Expand All @@ -36,7 +44,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if len(config.Share.ChatAdmin) == 0 {
return errs.New("share chat admin not configured")
}
apiPort, err := datautil.GetElemByIndex(config.ApiConfig.Api.Ports, index)
apiPort, err := datautil.GetElemByIndex(config.AdminAPI.Api.Ports, index)
if err != nil {
return err
}
Expand Down Expand Up @@ -66,11 +74,51 @@ func Start(ctx context.Context, index int, config *Config) error {
gin.SetMode(gin.ReleaseMode)
engine := gin.New()
engine.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
SetAdminRoute(engine, adminApi, mwApi)
return engine.Run(fmt.Sprintf(":%d", apiPort))
SetAdminRoute(engine, adminApi, mwApi, config, client)

if config.Discovery.Enable == kdisc.ETCDCONST {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
server := http.Server{Addr: fmt.Sprintf(":%d", apiPort), Handler: engine}
go func() {
err = server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()
shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := server.Shutdown(ctx)
if err != nil {
return errs.WrapMsg(err, "shutdown err")
}
return nil
}
disetcd.RegisterShutDown(shutdown)

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-sigs:
program.SIGTERMExit()
if err := shutdown(); err != nil {
return err
}
case <-netDone:
close(netDone)
return netErr
}
return nil
}

func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW, cfg *Config, client discovery.SvcDiscoveryRegistry) {

adminRouterGroup := router.Group("/account")
adminRouterGroup.POST("/login", admin.AdminLogin) // Login
Expand Down Expand Up @@ -149,4 +197,20 @@ func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
applicationGroup.POST("/delete_version", mw.CheckAdmin, admin.DeleteApplicationVersion)
applicationGroup.POST("/latest_version", admin.LatestApplicationVersion)
applicationGroup.POST("/page_versions", admin.PageApplicationVersion)

var etcdClient *clientv3.Client
if cfg.Discovery.Enable == kdisc.ETCDCONST {
etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
cm := NewConfigManager(cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv)
{
configGroup := router.Group("/config", mw.CheckAdmin)
configGroup.POST("/get_config_list", cm.GetConfigList)
configGroup.POST("/get_config", cm.GetConfig)
configGroup.POST("/set_config", cm.SetConfig)
configGroup.POST("/reset_config", cm.ResetConfig)
}
{
router.POST("/restart", mw.CheckAdmin, cm.Restart)
}
}
Loading

0 comments on commit 06353ce

Please sign in to comment.