Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: config center #2997

Merged
merged 45 commits into from
Dec 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
15ef276
chore: config
icey-yu Dec 19, 2024
6d60997
chore: config
icey-yu Dec 19, 2024
7f1a723
chore: config
icey-yu Dec 20, 2024
d21eb35
chore: config
icey-yu Dec 20, 2024
d72d314
chore: config
icey-yu Dec 20, 2024
12adfa5
feat: config
icey-yu Dec 23, 2024
fb9d8ac
fix: config
icey-yu Dec 23, 2024
42588ee
fix: config
icey-yu Dec 23, 2024
66f8df4
feat: config
icey-yu Dec 23, 2024
152d241
feat: config
icey-yu Dec 24, 2024
b92ab6f
feat: config
icey-yu Dec 24, 2024
755efcb
feat: config
icey-yu Dec 24, 2024
5d44940
feat: config
icey-yu Dec 24, 2024
fa79fac
feat: config
icey-yu Dec 24, 2024
8acd21e
feat: config
icey-yu Dec 24, 2024
3f482d6
feat: config
icey-yu Dec 24, 2024
1d1ac6e
feat: config
icey-yu Dec 24, 2024
aafa25c
feat: config
icey-yu Dec 24, 2024
bc5cfb2
feat: config
icey-yu Dec 24, 2024
6d38cd3
feat: config
icey-yu Dec 24, 2024
504ad90
feat: config
icey-yu Dec 24, 2024
99890e8
feat: config
icey-yu Dec 24, 2024
b5426bb
feat: config
icey-yu Dec 24, 2024
568a1c4
feat: config
icey-yu Dec 24, 2024
5cc018d
Merge: main
icey-yu Dec 24, 2024
2e4905e
feat: config
icey-yu Dec 24, 2024
62da33b
feat: config
icey-yu Dec 24, 2024
c1204e3
fix: config
icey-yu Dec 25, 2024
51fe509
fix: config
icey-yu Dec 25, 2024
4963b8f
fix: config
icey-yu Dec 25, 2024
5fd409d
fix: config
icey-yu Dec 25, 2024
59cbabe
fix: config
icey-yu Dec 25, 2024
bc09ca0
fix: config
icey-yu Dec 25, 2024
1750ca8
fix: config
icey-yu Dec 25, 2024
8a9c921
fix: config
icey-yu Dec 25, 2024
541120d
fix: config
icey-yu Dec 25, 2024
c0d4c26
fix: config
icey-yu Dec 25, 2024
81af37f
fix: config
icey-yu Dec 25, 2024
8451312
fix: config
icey-yu Dec 25, 2024
10a2f75
fix: config
icey-yu Dec 25, 2024
b66f32e
fix: config
icey-yu Dec 25, 2024
b783da2
fix: config
icey-yu Dec 25, 2024
54c6397
fix: config
icey-yu Dec 25, 2024
ccdd6d5
fix: config
icey-yu Dec 25, 2024
d8c15b2
fix: config
icey-yu Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/openim-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ prometheus:
maxConcurrentWorkers: 3
#Use geTui for offline push notifications, or choose fcm or jpns; corresponding configuration settings must be specified.
enable:
geTui:
getui:
pushUrl: https://restapi.getui.com/v2/$appId
masterSecret:
appKey:
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ services:
- ETCD_INITIAL_CLUSTER=s1=http://0.0.0.0:2380
- ETCD_INITIAL_CLUSTER_TOKEN=tkn
- ETCD_INITIAL_CLUSTER_STATE=new
volumes:
- "${DATA_DIR}/components/etcd:/etcd-data"
restart: always
networks:
- openim
Expand Down
250 changes: 250 additions & 0 deletions internal/api/config_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package api

import (
"encoding/json"
"reflect"
"strconv"
"time"

"github.com/gin-gonic/gin"
"github.com/openimsdk/open-im-server/v3/pkg/apistruct"
"github.com/openimsdk/open-im-server/v3/pkg/authverify"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/version"
"github.com/openimsdk/tools/apiresp"
"github.com/openimsdk/tools/errs"
"github.com/openimsdk/tools/log"
"github.com/openimsdk/tools/utils/runtimeenv"
clientv3 "go.etcd.io/etcd/client/v3"
)

type ConfigManager struct {
imAdminUserID []string
config *config.AllConfig
client *clientv3.Client
configPath string
runtimeEnv string
}

func NewConfigManager(IMAdminUserID []string, cfg *config.AllConfig, client *clientv3.Client, configPath string, runtimeEnv string) *ConfigManager {
return &ConfigManager{
imAdminUserID: IMAdminUserID,
config: cfg,
client: client,
configPath: configPath,
runtimeEnv: runtimeEnv,
}
}

func (cm *ConfigManager) CheckAdmin(c *gin.Context) {
if err := authverify.CheckAdmin(c, cm.imAdminUserID); err != nil {
apiresp.GinError(c, err)
c.Abort()
}
}

func (cm *ConfigManager) GetConfig(c *gin.Context) {
var req apistruct.GetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
conf := cm.config.Name2Config(req.ConfigName)
if conf == nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail("config name not found").Wrap())
return
}
b, err := json.Marshal(conf)
if err != nil {
apiresp.GinError(c, err)
return
}
apiresp.GinSuccess(c, string(b))
}

func (cm *ConfigManager) GetConfigList(c *gin.Context) {
var resp apistruct.GetConfigListResp
resp.ConfigNames = cm.config.GetConfigNames()
resp.Environment = runtimeenv.PrintRuntimeEnvironment()
resp.Version = version.Version

apiresp.GinSuccess(c, resp)
}

func (cm *ConfigManager) SetConfig(c *gin.Context) {
if cm.config.Discovery.Enable != config.ETCD {
apiresp.GinError(c, errs.New("only etcd support set config").Wrap())
return
}
var req apistruct.SetConfigReq
if err := c.BindJSON(&req); err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
var err error
switch req.ConfigName {
case cm.config.Discovery.GetConfigFileName():
err = compareAndSave[config.Discovery](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Kafka.GetConfigFileName():
err = compareAndSave[config.Kafka](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.LocalCache.GetConfigFileName():
err = compareAndSave[config.LocalCache](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Log.GetConfigFileName():
err = compareAndSave[config.Log](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Minio.GetConfigFileName():
err = compareAndSave[config.Minio](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Mongo.GetConfigFileName():
err = compareAndSave[config.Mongo](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Notification.GetConfigFileName():
err = compareAndSave[config.Notification](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.API.GetConfigFileName():
err = compareAndSave[config.API](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.CronTask.GetConfigFileName():
err = compareAndSave[config.CronTask](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.MsgGateway.GetConfigFileName():
err = compareAndSave[config.MsgGateway](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.MsgTransfer.GetConfigFileName():
err = compareAndSave[config.MsgTransfer](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Push.GetConfigFileName():
err = compareAndSave[config.Push](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Auth.GetConfigFileName():
err = compareAndSave[config.Auth](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Conversation.GetConfigFileName():
err = compareAndSave[config.Conversation](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Friend.GetConfigFileName():
err = compareAndSave[config.Friend](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Group.GetConfigFileName():
err = compareAndSave[config.Group](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Msg.GetConfigFileName():
err = compareAndSave[config.Msg](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Third.GetConfigFileName():
err = compareAndSave[config.Third](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.User.GetConfigFileName():
err = compareAndSave[config.User](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Redis.GetConfigFileName():
err = compareAndSave[config.Redis](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Share.GetConfigFileName():
err = compareAndSave[config.Share](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
case cm.config.Webhooks.GetConfigFileName():
err = compareAndSave[config.Webhooks](c, cm.config.Name2Config(req.ConfigName), &req, cm.client)
default:
apiresp.GinError(c, errs.ErrArgs.Wrap())
return
}
if err != nil {
apiresp.GinError(c, errs.ErrArgs.WithDetail(err.Error()).Wrap())
return
}
apiresp.GinSuccess(c, nil)
}

func compareAndSave[T any](c *gin.Context, old any, req *apistruct.SetConfigReq, client *clientv3.Client) error {
conf := new(T)
err := json.Unmarshal([]byte(req.Data), &conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
eq := reflect.DeepEqual(old, conf)
if eq {
return nil
}
data, err := json.Marshal(conf)
if err != nil {
return errs.ErrArgs.WithDetail(err.Error()).Wrap()
}
_, err = client.Put(c, etcd.BuildKey(req.ConfigName), string(data))
if err != nil {
return errs.WrapMsg(err, "save to etcd failed")
}
return nil
}

func (cm *ConfigManager) ResetConfig(c *gin.Context) {
go cm.resetConfig(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) resetConfig(c *gin.Context) {
txn := cm.client.Txn(c)
type initConf struct {
old any
new any
isChanged bool
}
configMap := map[string]*initConf{
cm.config.Discovery.GetConfigFileName(): {old: &cm.config.Discovery, new: new(config.Discovery)},
cm.config.Kafka.GetConfigFileName(): {old: &cm.config.Kafka, new: new(config.Kafka)},
cm.config.LocalCache.GetConfigFileName(): {old: &cm.config.LocalCache, new: new(config.LocalCache)},
cm.config.Log.GetConfigFileName(): {old: &cm.config.Log, new: new(config.Log)},
cm.config.Minio.GetConfigFileName(): {old: &cm.config.Minio, new: new(config.Minio)},
cm.config.Mongo.GetConfigFileName(): {old: &cm.config.Mongo, new: new(config.Mongo)},
cm.config.Notification.GetConfigFileName(): {old: &cm.config.Notification, new: new(config.Notification)},
cm.config.API.GetConfigFileName(): {old: &cm.config.API, new: new(config.API)},
cm.config.CronTask.GetConfigFileName(): {old: &cm.config.CronTask, new: new(config.CronTask)},
cm.config.MsgGateway.GetConfigFileName(): {old: &cm.config.MsgGateway, new: new(config.MsgGateway)},
cm.config.MsgTransfer.GetConfigFileName(): {old: &cm.config.MsgTransfer, new: new(config.MsgTransfer)},
cm.config.Push.GetConfigFileName(): {old: &cm.config.Push, new: new(config.Push)},
cm.config.Auth.GetConfigFileName(): {old: &cm.config.Auth, new: new(config.Auth)},
cm.config.Conversation.GetConfigFileName(): {old: &cm.config.Conversation, new: new(config.Conversation)},
cm.config.Friend.GetConfigFileName(): {old: &cm.config.Friend, new: new(config.Friend)},
cm.config.Group.GetConfigFileName(): {old: &cm.config.Group, new: new(config.Group)},
cm.config.Msg.GetConfigFileName(): {old: &cm.config.Msg, new: new(config.Msg)},
cm.config.Third.GetConfigFileName(): {old: &cm.config.Third, new: new(config.Third)},
cm.config.User.GetConfigFileName(): {old: &cm.config.User, new: new(config.User)},
cm.config.Redis.GetConfigFileName(): {old: &cm.config.Redis, new: new(config.Redis)},
cm.config.Share.GetConfigFileName(): {old: &cm.config.Share, new: new(config.Share)},
cm.config.Webhooks.GetConfigFileName(): {old: &cm.config.Webhooks, new: new(config.Webhooks)},
}

changedKeys := make([]string, 0, len(configMap))
for k, v := range configMap {
err := config.Load(
cm.configPath,
k,
config.EnvPrefixMap[k],
cm.runtimeEnv,
v.new,
)
if err != nil {
log.ZError(c, "load config failed", err)
continue
}
v.isChanged = reflect.DeepEqual(v.old, v.new)
if !v.isChanged {
changedKeys = append(changedKeys, k)
}
}

ops := make([]clientv3.Op, 0)
for _, k := range changedKeys {
data, err := json.Marshal(configMap[k].new)
if err != nil {
log.ZError(c, "marshal config failed", err)
continue
}
ops = append(ops, clientv3.OpPut(etcd.BuildKey(k), string(data)))
}
if len(ops) > 0 {
txn.Then(ops...)
_, err := txn.Commit()
if err != nil {
log.ZError(c, "commit etcd txn failed", err)
return
}
}
}

func (cm *ConfigManager) Restart(c *gin.Context) {
go cm.restart(c)
apiresp.GinSuccess(c, nil)
}

func (cm *ConfigManager) restart(c *gin.Context) {
time.Sleep(time.Millisecond * 200) // wait for Restart http call return
t := time.Now().Unix()
_, err := cm.client.Put(c, etcd.BuildKey(etcd.RestartKey), strconv.Itoa(int(t)))
if err != nil {
log.ZError(c, "restart etcd put key failed", err)
}
}
31 changes: 21 additions & 10 deletions internal/api/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
"time"

conf "github.com/openimsdk/open-im-server/v3/pkg/common/config"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister"
kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discovery"
disetcd "github.com/openimsdk/open-im-server/v3/pkg/common/discovery/etcd"
"github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics"
"github.com/openimsdk/tools/discovery/etcd"
"github.com/openimsdk/tools/errs"
Expand All @@ -43,11 +44,10 @@ import (
)

type Config struct {
API conf.API
Share conf.Share
Discovery conf.Discovery
*conf.AllConfig

RuntimeEnv string
ConfigPath string
}

func Start(ctx context.Context, index int, config *Config) error {
Expand Down Expand Up @@ -139,22 +139,33 @@ func Start(ctx context.Context, index int, config *Config) error {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr))
netDone <- struct{}{}

}
}()

if config.Discovery.Enable == conf.ETCD {
cm := disetcd.NewConfigManager(client.(*etcd.SvcDiscoveryRegistryImpl).GetClient(), config.GetConfigNames())
cm.Watch(ctx)
}

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)

ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
select {
case <-sigs:
program.SIGTERMExit()
shutdown := func() error {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
err := server.Shutdown(ctx)
if err != nil {
return errs.WrapMsg(err, "shutdown err")
}
return nil
}
disetcd.RegisterShutDown(shutdown)
select {
case <-sigs:
program.SIGTERMExit()
if err := shutdown(); err != nil {
return err
}
case <-netDone:
close(netDone)
return netErr
Expand Down
Loading
Loading