Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: config center #619

Merged
merged 2 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 202 additions & 0 deletions internal/api/admin/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package admin

import (
"encoding/json"
"reflect"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/openimsdk/chat/pkg/common/apistruct"
"github.com/openimsdk/chat/pkg/common/config"
"github.com/openimsdk/chat/pkg/common/kdisc"
"github.com/openimsdk/chat/pkg/common/kdisc/etcd"
"github.com/openimsdk/chat/version"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
clientv3 "go.etcd.io/etcd/client/v3"
)

type ConfigManager struct {
config *config.AllConfig
client *clientv3.Client
configPath string
runtimeEnv string
}

func NewConfigManager(cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
return &ConfigManager{
config: cfg,
client: client,
configPath: configPath,
runtimeEnv: runtimeEnv,
}
}

func (cm *ConfigManager) GetConfig(c *gin.Context) {
var req apistruct.GetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
conf := cm.config.Name2Config(req.ConfigName)
if conf == nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
return
}
b, err := json.Marshal(conf)
if err != nil {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, string(b))
}

func (cm *ConfigManager) GetConfigList(c *gin.Context) {
var resp apistruct.GetConfigListResp
resp.ConfigNames = cm.config.GetConfigNames()
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
resp.Version = version.Version

apiresp.GinSuccess(c, resp)
}

func (cm *ConfigManager) SetConfig(c *gin.Context) {
if cm.config.Discovery.Enable != kdisc.ETCDCONST {
apiresp.GinError(c, errs.New("only etcd support set config").Wrap())
return
}
var req apistruct.SetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
var err error
switch req.ConfigName {
case config.DiscoveryConfigFileName:
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.LogConfigFileName:
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.MongodbConfigFileName:
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatAPIAdminCfgFileName:
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatAPIChatCfgFileName:
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatRPCAdminCfgFileName:
err = compareAndSave[config.Admin](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ChatRPCChatCfgFileName:
err = compareAndSave[config.Chat](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.ShareFileName:
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case config.RedisConfigFileName:
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
default:
apiresp.GinError(c, errs.ErrArgs.Wrap())
return
}
if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
apiresp.GinSuccess(c, nil)
}

func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
conf := new(T)
err := json.Unmarshal([]byte(req.Data), &conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
eq := reflect.DeepEqual(old, conf)
if eq {
return nil
}
data, err := json.Marshal(conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
if err != nil {
return errs.WrapMsg(err, "save to etcd failed")
}
return nil
}

func (cm *ConfigManager) ResetConfig(c *gin.Context) {
go cm.resetConfig(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) resetConfig(c *gin.Context) {
txn := cm.client.Txn(c)
type initConf struct {
old any
new any
isChanged bool
}
configMap := map[string]*initConf{
config.DiscoveryConfigFileName: {old: &cm.config.Discovery, new: new(config.Discovery)},
config.LogConfigFileName: {old: &cm.config.Log, new: new(config.Log)},
config.MongodbConfigFileName: {old: &cm.config.Mongo, new: new(config.Mongo)},
config.ChatAPIAdminCfgFileName: {old: &cm.config.AdminAPI, new: new(config.API)},
config.ChatAPIChatCfgFileName: {old: &cm.config.ChatAPI, new: new(config.API)},
config.ChatRPCAdminCfgFileName: {old: &cm.config.Admin, new: new(config.Admin)},
config.ChatRPCChatCfgFileName: {old: &cm.config.Chat, new: new(config.Chat)},
config.RedisConfigFileName: {old: &cm.config.Redis, new: new(config.Redis)},
config.ShareFileName: {old: &cm.config.Share, new: new(config.Share)},
}

changedKeys := make([]string, 0, len(configMap))
for k, v := range configMap {
err := config.Load(
cm.configPath,
k,
config.EnvPrefixMap[k],
cm.runtimeEnv,
v.new,
)
if err != nil {
log.ZError(c, "load config failed", err)
continue
}
v.isChanged = reflect.DeepEqual(v.old, v.new)
if !v.isChanged {
changedKeys = append(changedKeys, k)
}
}

ops := make([]clientv3.Op, 0)
for _, k := range changedKeys {
data, err := json.Marshal(configMap[k].new)
if err != nil {
log.ZError(c, "marshal config failed", err)
continue
}
ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data)))
}
if len(ops) > 0 {
txn.Then(ops...)
_, err := txn.Commit()
if err != nil {
log.ZError(c, "commit etcd txn failed", err)
return
}
}
}

func (cm *ConfigManager) Restart(c *gin.Context) {
go cm.restart(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) restart(c *gin.Context) {
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
t := time.Now().Unix()
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
if err != nil {
log.ZError(c, "restart etcd put key failed", err)
}
}
84 changes: 74 additions & 10 deletions internal/api/admin/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,40 @@ package admin

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/gin-gonic/gin"
chatmw "github.com/openimsdk/chat/internal/api/mw"
"github.com/openimsdk/chat/internal/api/util"
"github.com/openimsdk/chat/pkg/common/config"
"github.com/openimsdk/chat/pkg/common/imapi"
"github.com/openimsdk/chat/pkg/common/kdisc"
disetcd "github.com/openimsdk/chat/pkg/common/kdisc/etcd"
adminclient "github.com/openimsdk/chat/pkg/protocol/admin"
chatclient "github.com/openimsdk/chat/pkg/protocol/chat"
"github.com/openimsdk/tools/discovery"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/mw"
"github.com/openimsdk/tools/system/program"
"github.com/openimsdk/tools/utils/datautil"
"github.com/openimsdk/tools/utils/runtimeenv"
clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/openimsdk/tools/utils/runtimeenv"
)

type Config struct {
ApiConfig config.API

Discovery config.Discovery
Share config.Share
*config.AllConfig

RuntimeEnv string
ConfigPath string
}

func Start(ctx context.Context, index int, config *Config) error {
Expand All @@ -36,7 +44,7 @@ func Start(ctx context.Context, index int, config *Config) error {
if len(config.Share.ChatAdmin) == 0 {
return errs.New("share chat admin not configured")
}
apiPort, err := datautil.GetElemByIndex(config.ApiConfig.Api.Ports, index)
apiPort, err := datautil.GetElemByIndex(config.AdminAPI.Api.Ports, index)
if err != nil {
return err
}
Expand Down Expand Up @@ -66,11 +74,51 @@ func Start(ctx context.Context, index int, config *Config) error {
gin.SetMode(gin.ReleaseMode)
engine := gin.New()
engine.Use(gin.Recovery(), mw.CorsHandler(), mw.GinParseOperationID())
SetAdminRoute(engine, adminApi, mwApi)
return engine.Run(fmt.Sprintf(":%d", apiPort))
SetAdminRoute(engine, adminApi, mwApi, config, client)

if config.Discovery.Enable == kdisc.ETCDCONST {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}
var (
netDone = make(chan struct{}, 1)
netErr error
)
server := http.Server{Addr: fmt.Sprintf(":%d", apiPort), Handler: engine}
go func() {
err = server.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}
}
}()
shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := server.Shutdown(ctx)
if err != nil {
return errs.WrapMsg(err, "shutdown err")
}
return nil
}
disetcd.RegisterShutDown(shutdown)

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
select {
case <-sigs:
program.SIGTERMExit()
if err := shutdown(); err != nil {
return err
}
case <-netDone:
close(netDone)
return netErr
}
return nil
}

func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW, cfg *Config, client discovery.SvcDiscoveryRegistry) {

adminRouterGroup := router.Group("/account")
adminRouterGroup.POST("/login", admin.AdminLogin) // Login
Expand Down Expand Up @@ -149,4 +197,20 @@ func SetAdminRoute(router gin.IRouter, admin *Api, mw *chatmw.MW) {
applicationGroup.POST("/delete_version", mw.CheckAdmin, admin.DeleteApplicationVersion)
applicationGroup.POST("/latest_version", admin.LatestApplicationVersion)
applicationGroup.POST("/page_versions", admin.PageApplicationVersion)

var etcdClient *clientv3.Client
if cfg.Discovery.Enable == kdisc.ETCDCONST {
etcdClient = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient()
}
cm := NewConfigManager(cfg.AllConfig, etcdClient, cfg.ConfigPath, cfg.RuntimeEnv)
{
configGroup := router.Group("/config", mw.CheckAdmin)
configGroup.POST("/get_config_list", cm.GetConfigList)
configGroup.POST("/get_config", cm.GetConfig)
configGroup.POST("/set_config", cm.SetConfig)
configGroup.POST("/reset_config", cm.ResetConfig)
}
{
router.POST("/restart", mw.CheckAdmin, cm.Restart)
}
}
Loading
Loading