diff --git a/config/openim-api.yml b/config/openim-api.yml index 4c38e1005b..a23b5fb314 100644 --- a/config/openim-api.yml +++ b/config/openim-api.yml @@ -10,7 +10,10 @@ api: prometheus: # Whether to enable prometheus enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # Prometheus listening ports, must match the number of api.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12002 ] # This address can be accessed via a browser grafanaURL: http://127.0.0.1:13000/ diff --git a/config/openim-msggateway.yml b/config/openim-msggateway.yml index 74eab35d54..d374ce3c76 100644 --- a/config/openim-msggateway.yml +++ b/config/openim-msggateway.yml @@ -11,6 +11,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12140, 12141, 12142, 12143, 12144, 12145, 12146, 12147, 12148, 12149, 12150, 12151, 12152, 12153, 12154, 12155 ] # IP address that the RPC/WebSocket service listens on; setting to 0.0.0.0 listens on both internal and external IPs. If left blank, it automatically uses the internal network IP diff --git a/config/openim-msgtransfer.yml b/config/openim-msgtransfer.yml index 94ed073d86..39b23b222e 100644 --- a/config/openim-msgtransfer.yml +++ b/config/openim-msgtransfer.yml @@ -1,6 +1,8 @@ prometheus: # Enable or disable Prometheus monitoring enable: true + # autoSetPorts indicates whether to automatically set the ports + autoSetPorts: true # List of ports that Prometheus listens on; each port corresponds to an instance of monitoring. Ensure these are managed accordingly - # Because four instances have been launched, four ports need to be specified + # It will only take effect when autoSetPorts is set to false. ports: [ 12020, 12021, 12022, 12023, 12024, 12025, 12026, 12027, 12028, 12029, 12030, 12031, 12032, 12033, 12034, 12035 ] diff --git a/config/openim-push.yml b/config/openim-push.yml index 53c88ed410..ad4d032f9f 100644 --- a/config/openim-push.yml +++ b/config/openim-push.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12170, 12171, 12172, 12173, 12174, 12175, 12176, 12177, 12178, 12179, 12180, 12182, 12183, 12184, 12185, 12186 ] maxConcurrentWorkers: 3 diff --git a/config/openim-rpc-auth.yml b/config/openim-rpc-auth.yml index 230aa8720a..ed41b3305f 100644 --- a/config/openim-rpc-auth.yml +++ b/config/openim-rpc-auth.yml @@ -14,6 +14,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12200 ] tokenPolicy: diff --git a/config/openim-rpc-conversation.yml b/config/openim-rpc-conversation.yml index ed61f70102..0636a76e33 100644 --- a/config/openim-rpc-conversation.yml +++ b/config/openim-rpc-conversation.yml @@ -13,4 +13,5 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12220 ] diff --git a/config/openim-rpc-friend.yml b/config/openim-rpc-friend.yml index e10ef496a6..e2b150cec3 100644 --- a/config/openim-rpc-friend.yml +++ b/config/openim-rpc-friend.yml @@ -13,4 +13,5 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12240 ] diff --git a/config/openim-rpc-group.yml b/config/openim-rpc-group.yml index 9f5ceababd..a8c2d5ec11 100644 --- a/config/openim-rpc-group.yml +++ b/config/openim-rpc-group.yml @@ -13,6 +13,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12260 ] diff --git a/config/openim-rpc-msg.yml b/config/openim-rpc-msg.yml index 72e055a5d5..fdb6d8035e 100644 --- a/config/openim-rpc-msg.yml +++ b/config/openim-rpc-msg.yml @@ -13,6 +13,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12280 ] diff --git a/config/openim-rpc-third.yml b/config/openim-rpc-third.yml index 9578e82c65..09c2a9cd51 100644 --- a/config/openim-rpc-third.yml +++ b/config/openim-rpc-third.yml @@ -13,6 +13,7 @@ prometheus: # Enable or disable Prometheus monitoring enable: true # List of ports that Prometheus listens on; these must match the number of rpc.ports to ensure correct monitoring setup + # It will only take effect when autoSetPorts is set to false. ports: [ 12300 ] diff --git a/config/openim-rpc-user.yml b/config/openim-rpc-user.yml index 6f7b9648b0..7da94ca0df 100644 --- a/config/openim-rpc-user.yml +++ b/config/openim-rpc-user.yml @@ -13,4 +13,5 @@ prometheus: # Whether to enable prometheus enable: true # Prometheus listening ports, must be consistent with the number of rpc.ports + # It will only take effect when autoSetPorts is set to false. ports: [ 12320 ] diff --git a/config/prometheus.yml b/config/prometheus.yml index ab427ee828..6fb1128249 100644 --- a/config/prometheus.yml +++ b/config/prometheus.yml @@ -26,61 +26,94 @@ scrape_configs: - job_name: node_exporter static_configs: - targets: [ internal_ip:20500 ] + - job_name: openimserver-openim-api - static_configs: - - targets: [ internal_ip:12002 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/api" +# static_configs: +# - targets: [ internal_ip:12002 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msggateway - static_configs: - - targets: [ internal_ip:12140 ] -# - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_gateway" +# static_configs: +# - targets: [ internal_ip:12140 ] +# # - targets: [ internal_ip:12140, internal_ip:12141, internal_ip:12142, internal_ip:12143, internal_ip:12144, internal_ip:12145, internal_ip:12146, internal_ip:12147, internal_ip:12148, internal_ip:12149, internal_ip:12150, internal_ip:12151, internal_ip:12152, internal_ip:12153, internal_ip:12154, internal_ip:12155 ] +# labels: +# namespace: default + - job_name: openimserver-openim-msgtransfer - static_configs: - - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] -# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg_transfer" +# static_configs: +# - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027 ] +# # - targets: [ internal_ip:12020, internal_ip:12021, internal_ip:12022, internal_ip:12023, internal_ip:12024, internal_ip:12025, internal_ip:12026, internal_ip:12027, internal_ip:12028, internal_ip:12029, internal_ip:12030, internal_ip:12031, internal_ip:12032, internal_ip:12033, internal_ip:12034, internal_ip:12035 ] +# labels: +# namespace: default + - job_name: openimserver-openim-push - static_configs: - - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] -# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/push" +# static_configs: +# - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177 ] +## - targets: [ internal_ip:12170, internal_ip:12171, internal_ip:12172, internal_ip:12173, internal_ip:12174, internal_ip:12175, internal_ip:12176, internal_ip:12177, internal_ip:12178, internal_ip:12179, internal_ip:12180, internal_ip:12182, internal_ip:12183, internal_ip:12184, internal_ip:12185, internal_ip:12186 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-auth - static_configs: - - targets: [ internal_ip:12200 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/auth" +# static_configs: +# - targets: [ internal_ip:12200 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-conversation - static_configs: - - targets: [ internal_ip:12220 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/conversation" +# static_configs: +# - targets: [ internal_ip:12220 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-friend - static_configs: - - targets: [ internal_ip:12240 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/friend" +# static_configs: +# - targets: [ internal_ip:12240 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-group - static_configs: - - targets: [ internal_ip:12260 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/group" +# static_configs: +# - targets: [ internal_ip:12260 ] +# labels: +# namespace: default. + - job_name: openimserver-openim-rpc-msg - static_configs: - - targets: [ internal_ip:12280 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/msg" +# static_configs: +# - targets: [ internal_ip:12280 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-third - static_configs: - - targets: [ internal_ip:12300 ] - labels: - namespace: default + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/third" +# static_configs: +# - targets: [ internal_ip:12300 ] +# labels: +# namespace: default + - job_name: openimserver-openim-rpc-user - static_configs: - - targets: [ internal_ip:12320 ] - labels: - namespace: default \ No newline at end of file + http_sd_configs: + - url: "http://internal_ip:10002/prometheus_discovery/user" +# static_configs: +# - targets: [ internal_ip:12320 ] +# labels: +# namespace: default \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 8d25383bc5..57e6542082 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -146,49 +146,49 @@ services: networks: - openim -# prometheus: -# image: ${PROMETHEUS_IMAGE} -# container_name: prometheus -# restart: always -# user: root -# volumes: -# - ./config/prometheus.yml:/etc/prometheus/prometheus.yml -# - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml -# - ${DATA_DIR}/components/prometheus/data:/prometheus -# command: -# - '--config.file=/etc/prometheus/prometheus.yml' -# - '--storage.tsdb.path=/prometheus' -# ports: -# - "19091:9090" -# networks: -# - openim -# -# alertmanager: -# image: ${ALERTMANAGER_IMAGE} -# container_name: alertmanager -# restart: always -# volumes: -# - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml -# - ./config/email.tmpl:/etc/alertmanager/email.tmpl -# ports: -# - "19093:9093" -# networks: -# - openim -# -# grafana: -# image: ${GRAFANA_IMAGE} -# container_name: grafana -# user: root -# restart: always -# environment: -# - GF_SECURITY_ALLOW_EMBEDDING=true -# - GF_SESSION_COOKIE_SAMESITE=none -# - GF_SESSION_COOKIE_SECURE=true -# - GF_AUTH_ANONYMOUS_ENABLED=true -# - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin -# ports: -# - "13000:3000" -# volumes: -# - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana -# networks: -# - openim + prometheus: + image: ${PROMETHEUS_IMAGE} + container_name: prometheus + restart: always + user: root + volumes: + - ./config/prometheus.yml:/etc/prometheus/prometheus.yml + - ./config/instance-down-rules.yml:/etc/prometheus/instance-down-rules.yml + - ${DATA_DIR}/components/prometheus/data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + ports: + - "19091:9090" + networks: + - openim + + alertmanager: + image: ${ALERTMANAGER_IMAGE} + container_name: alertmanager + restart: always + volumes: + - ./config/alertmanager.yml:/etc/alertmanager/alertmanager.yml + - ./config/email.tmpl:/etc/alertmanager/email.tmpl + ports: + - "19093:9093" + networks: + - openim + + grafana: + image: ${GRAFANA_IMAGE} + container_name: grafana + user: root + restart: always + environment: + - GF_SECURITY_ALLOW_EMBEDDING=true + - GF_SESSION_COOKIE_SAMESITE=none + - GF_SESSION_COOKIE_SECURE=true + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + ports: + - "13000:3000" + volumes: + - ${DATA_DIR:-./}/components/grafana:/var/lib/grafana + networks: + - openim diff --git a/go.mod b/go.mod index 8400166469..d57a78add3 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/mitchellh/mapstructure v1.5.0 github.com/openimsdk/protocol v0.0.72-alpha.61 - github.com/openimsdk/tools v0.0.50-alpha.38 + github.com/openimsdk/tools v0.0.50-alpha.47 github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.18.0 github.com/stretchr/testify v1.9.0 @@ -43,6 +43,7 @@ require ( github.com/shirou/gopsutil v3.21.11+incompatible github.com/spf13/viper v1.18.2 github.com/stathat/consistent v1.0.0 + go.etcd.io/etcd/client/v3 v3.5.13 go.uber.org/automaxprocs v1.5.3 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 golang.org/x/sync v0.8.0 @@ -165,7 +166,6 @@ require ( github.com/yusufpapurcu/wmi v1.2.4 // indirect go.etcd.io/etcd/api/v3 v3.5.13 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.13 // indirect - go.etcd.io/etcd/client/v3 v3.5.13 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect diff --git a/go.sum b/go.sum index 1ca89efd4f..917d5d33ff 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,8 @@ github.com/openimsdk/gomake v0.0.14-alpha.5 h1:VY9c5x515lTfmdhhPjMvR3BBRrRquAUCF github.com/openimsdk/gomake v0.0.14-alpha.5/go.mod h1:PndCozNc2IsQIciyn9mvEblYWZwJmAI+06z94EY+csI= github.com/openimsdk/protocol v0.0.72-alpha.61 h1:RuZR9/Sg3p6Bpb2CKPjPoA2AUmTvHITmhZ3PT/RbWMs= github.com/openimsdk/protocol v0.0.72-alpha.61/go.mod h1:Iet+piS/jaS+kWWyj6EEr36mk4ISzIRYjoMSVA4dq2M= -github.com/openimsdk/tools v0.0.50-alpha.38 h1:AU6/cvDfN4ciIOwAj8IWEwze3DeEp2cHYPgW3y0OlbU= -github.com/openimsdk/tools v0.0.50-alpha.38/go.mod h1:/Em/fQH46CuWf60+hcmvZyboGCQpSDEb2MdQ4nmQRAk= +github.com/openimsdk/tools v0.0.50-alpha.47 h1:Cfe2va/g6WhLjOoQqZkjrdlEDq1dUsfcQsdUB5oADVA= +github.com/openimsdk/tools v0.0.50-alpha.47/go.mod h1:muCtxguNJv8lFwLei27UASu2Nvg4ERSeN0R4K5tivk0= github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6Wq+LM= github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= diff --git a/internal/api/init.go b/internal/api/init.go index 6c25609f60..0ffa3369b3 100644 --- a/internal/api/init.go +++ b/internal/api/init.go @@ -16,9 +16,12 @@ package api import ( "context" + "errors" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" "github.com/openimsdk/tools/utils/network" "net" "net/http" @@ -62,16 +65,57 @@ func Start(ctx context.Context, index int, config *Config) error { prometheusPort int ) - router := newGinRouter(client, config) + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if config.API.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + + router := newGinRouter(client, config, client) if config.API.Prometheus.Enable { - go func() { + var ( + listener net.Listener + ) + + if config.API.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(prommetrics.APIKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { prometheusPort, err = datautil.GetElemByIndex(config.API.Prometheus.Ports, index) if err != nil { - netErr = err - netDone <- struct{}{} - return + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) } - if err := prommetrics.ApiInit(prometheusPort); err != nil && err != http.ErrServerClosed { + } + + go func() { + if err := prommetrics.ApiInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api prometheus start err: %d", prometheusPort)) netDone <- struct{}{} } @@ -84,7 +128,7 @@ func Start(ctx context.Context, index int, config *Config) error { log.CInfo(ctx, "API server is initializing", "address", address, "apiPort", apiPort, "prometheusPort", prometheusPort) go func() { err = server.ListenAndServe() - if err != nil && err != http.ErrServerClosed { + if err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("api start err: %s", server.Addr)) netDone <- struct{}{} diff --git a/internal/api/prometheus_discovery.go b/internal/api/prometheus_discovery.go new file mode 100644 index 0000000000..6e33274be9 --- /dev/null +++ b/internal/api/prometheus_discovery.go @@ -0,0 +1,113 @@ +package api + +import ( + "encoding/json" + "github.com/gin-gonic/gin" + "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" + "github.com/openimsdk/tools/apiresp" + "github.com/openimsdk/tools/discovery" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/errs" + "github.com/openimsdk/tools/log" + clientv3 "go.etcd.io/etcd/client/v3" + "net/http" +) + +type PrometheusDiscoveryApi struct { + config *Config + client *clientv3.Client +} + +func NewPrometheusDiscoveryApi(config *Config, client discovery.SvcDiscoveryRegistry) *PrometheusDiscoveryApi { + api := &PrometheusDiscoveryApi{ + config: config, + } + if config.Discovery.Enable == discoveryregister.Etcd { + api.client = client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + } + return api +} + +func (p *PrometheusDiscoveryApi) Enable(c *gin.Context) { + if p.config.Discovery.Enable != discoveryregister.Etcd { + c.JSON(http.StatusOK, []struct{}{}) + c.Abort() + } +} + +func (p *PrometheusDiscoveryApi) discovery(c *gin.Context, key string) { + eResp, err := p.client.Get(c, prommetrics.BuildDiscoveryKey(key)) + if err != nil { + // Log and respond with an error if preparation fails. + apiresp.GinError(c, errs.WrapMsg(err, "etcd get err")) + return + } + if len(eResp.Kvs) == 0 { + c.JSON(http.StatusOK, []*prommetrics.Target{}) + } + + var ( + resp = &prommetrics.RespTarget{ + Targets: make([]string, 0, len(eResp.Kvs)), + } + ) + + for i := range eResp.Kvs { + var target prommetrics.Target + err = json.Unmarshal(eResp.Kvs[i].Value, &target) + if err != nil { + log.ZError(c, "prometheus unmarshal err", errs.Wrap(err)) + } + resp.Targets = append(resp.Targets, target.Target) + if resp.Labels == nil { + resp.Labels = target.Labels + } + } + + c.JSON(200, []*prommetrics.RespTarget{resp}) +} + +func (p *PrometheusDiscoveryApi) Api(c *gin.Context) { + p.discovery(c, prommetrics.APIKeyName) +} + +func (p *PrometheusDiscoveryApi) User(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.User) +} + +func (p *PrometheusDiscoveryApi) Group(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Group) +} + +func (p *PrometheusDiscoveryApi) Msg(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Msg) +} + +func (p *PrometheusDiscoveryApi) Friend(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Friend) +} + +func (p *PrometheusDiscoveryApi) Conversation(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Conversation) +} + +func (p *PrometheusDiscoveryApi) Third(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Third) +} + +func (p *PrometheusDiscoveryApi) Auth(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Auth) +} + +func (p *PrometheusDiscoveryApi) Push(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.Push) +} + +func (p *PrometheusDiscoveryApi) MessageGateway(c *gin.Context) { + p.discovery(c, p.config.Discovery.RpcService.MessageGateway) +} + +func (p *PrometheusDiscoveryApi) MessageTransfer(c *gin.Context) { + p.discovery(c, prommetrics.MessageTransferKeyName) +} diff --git a/internal/api/router.go b/internal/api/router.go index 52d26bdc5f..6714d645c6 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -48,7 +48,7 @@ func prommetricsGin() gin.HandlerFunc { } } -func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.Engine { +func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config, client discovery.SvcDiscoveryRegistry) *gin.Engine { disCov.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) gin.SetMode(gin.ReleaseMode) @@ -78,6 +78,7 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En u := NewUserApi(*userRpc) m := NewMessageApi(messageRpc, userRpc, config.Share.IMAdminUserID) j := jssdk.NewJSSdkApi(userRpc.Client, friendRpc.Client, groupRpc.Client, messageRpc.Client, conversationRpc.Client) + pd := NewPrometheusDiscoveryApi(config, client) userRouterGroup := r.Group("/user") { userRouterGroup.POST("/user_register", u.UserRegister) @@ -254,6 +255,19 @@ func newGinRouter(disCov discovery.SvcDiscoveryRegistry, config *Config) *gin.En jssdk.POST("/get_conversations", j.GetConversations) jssdk.POST("/get_active_conversations", j.GetActiveConversations) + proDiscoveryGroup := r.Group("/prometheus_discovery", pd.Enable) + proDiscoveryGroup.GET("/api", pd.Api) + proDiscoveryGroup.GET("/user", pd.User) + proDiscoveryGroup.GET("/group", pd.Group) + proDiscoveryGroup.GET("/msg", pd.Msg) + proDiscoveryGroup.GET("/friend", pd.Friend) + proDiscoveryGroup.GET("/conversation", pd.Conversation) + proDiscoveryGroup.GET("/third", pd.Third) + proDiscoveryGroup.GET("/auth", pd.Auth) + proDiscoveryGroup.GET("/push", pd.Push) + proDiscoveryGroup.GET("/msg_gateway", pd.MessageGateway) + proDiscoveryGroup.GET("/msg_transfer", pd.MessageTransfer) + return r } diff --git a/internal/msggateway/init.go b/internal/msggateway/init.go index 1796a54822..4d07c89dc3 100644 --- a/internal/msggateway/init.go +++ b/internal/msggateway/init.go @@ -35,7 +35,7 @@ type Config struct { // Start run ws server. func Start(ctx context.Context, index int, conf *Config) error { - log.CInfo(ctx, "MSG-GATEWAY server is initializing", "autoSetPorts", conf.MsgGateway.RPC.AutoSetPorts, + log.CInfo(ctx, "MSG-GATEWAY server is initializing", "rpcPorts", conf.MsgGateway.RPC.Ports, "wsPort", conf.MsgGateway.LongConnSvr.Ports, "prometheusPorts", conf.MsgGateway.Prometheus.Ports) wsPort, err := datautil.GetElemByIndex(conf.MsgGateway.LongConnSvr.Ports, index) diff --git a/internal/msgtransfer/init.go b/internal/msgtransfer/init.go index b65532a8ea..9ff2c9c187 100644 --- a/internal/msgtransfer/init.go +++ b/internal/msgtransfer/init.go @@ -18,9 +18,14 @@ import ( "context" "errors" "fmt" + "github.com/openimsdk/tools/discovery/etcd" + "github.com/openimsdk/tools/utils/jsonutil" + "github.com/openimsdk/tools/utils/network" + "net" "net/http" "os" "os/signal" + "strconv" "syscall" "github.com/openimsdk/open-im-server/v3/pkg/common/prommetrics" @@ -32,6 +37,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" discRegister "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" + kdisc "github.com/openimsdk/open-im-server/v3/pkg/common/discoveryregister" "github.com/openimsdk/open-im-server/v3/pkg/common/storage/controller" "github.com/openimsdk/open-im-server/v3/pkg/rpcclient" "github.com/openimsdk/tools/errs" @@ -134,21 +140,67 @@ func (m *MsgTransfer) Start(index int, config *Config) error { return err } + client, err := kdisc.NewDiscoveryRegister(&config.Discovery) + if err != nil { + return errs.WrapMsg(err, "failed to register discovery service") + } + + registerIP, err := network.GetRpcRegisterIP("") + if err != nil { + return err + } + + getAutoPort := func() (net.Listener, int, error) { + registerAddr := net.JoinHostPort(registerIP, "0") + listener, err := net.Listen("tcp", registerAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "registerAddr", registerAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if config.MsgTransfer.Prometheus.AutoSetPorts && config.Discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "RegisterName", "api").Wrap() + } + if config.MsgTransfer.Prometheus.Enable { + var ( + listener net.Listener + prometheusPort int + ) + + if config.MsgTransfer.Prometheus.AutoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(context.TODO(), prommetrics.BuildDiscoveryKey(prommetrics.MessageTransferKeyName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "addr", fmt.Sprintf(":%d", prometheusPort)) + } + } + go func() { defer func() { if r := recover(); r != nil { mw.PanicStackToLog(m.ctx, r) } }() - prometheusPort, err := datautil.GetElemByIndex(config.MsgTransfer.Prometheus.Ports, index) - if err != nil { - netErr = err - netDone <- struct{}{} - return - } - - if err := prommetrics.TransferInit(prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := prommetrics.TransferInit(listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, "prometheus start error", "prometheusPort", prometheusPort) netDone <- struct{}{} } diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 6082b5f239..536a629900 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -106,9 +106,10 @@ type API struct { CompressionLevel int `mapstructure:"compressionLevel"` } `mapstructure:"api"` Prometheus struct { - Enable bool `mapstructure:"enable"` - Ports []int `mapstructure:"ports"` - GrafanaURL string `mapstructure:"grafanaURL"` + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + GrafanaURL string `mapstructure:"grafanaURL"` } `mapstructure:"prometheus"` } @@ -191,7 +192,11 @@ type MsgGateway struct { } type MsgTransfer struct { - Prometheus Prometheus `mapstructure:"prometheus"` + Prometheus struct { + Enable bool `mapstructure:"enable"` + AutoSetPorts bool `mapstructure:"autoSetPorts"` + Ports []int `mapstructure:"ports"` + } `mapstructure:"prometheus"` } type Push struct { diff --git a/pkg/common/config/load_config_test.go b/pkg/common/config/load_config_test.go index a0345fc7a5..763bffd9f5 100644 --- a/pkg/common/config/load_config_test.go +++ b/pkg/common/config/load_config_test.go @@ -59,3 +59,11 @@ func TestLoadOpenIMThirdConfig(t *testing.T) { // Environment: IMENV_OPENIM_RPC_THIRD_OBJECT_ENABLE=enabled;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ENDPOINT=https://oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKET=my_bucket_name;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_BUCKETURL=https://my_bucket_name.oss-cn-chengdu.aliyuncs.com;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYID=AKID1234567890;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_ACCESSKEYSECRET=abc123xyz789;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_SESSIONTOKEN=session_token_value;IMENV_OPENIM_RPC_THIRD_OBJECT_OSS_PUBLICREAD=true } + +func TestTransferConfig(t *testing.T) { + var tran MsgTransfer + err := LoadConfig("../../../config/openim-msgtransfer.yml", "IMENV_OPENIM-MSGTRANSFER", &tran) + assert.Nil(t, err) + assert.Equal(t, true, tran.Prometheus.Enable) + assert.Equal(t, true, tran.Prometheus.AutoSetPorts) +} diff --git a/pkg/common/discoveryregister/discoveryregister.go b/pkg/common/discoveryregister/discoveryregister.go index be82aa5265..18b54a4460 100644 --- a/pkg/common/discoveryregister/discoveryregister.go +++ b/pkg/common/discoveryregister/discoveryregister.go @@ -23,12 +23,16 @@ import ( "time" ) +const ( + Etcd = "etcd" +) + // NewDiscoveryRegister creates a new service discovery and registry client based on the provided environment type. func NewDiscoveryRegister(discovery *config.Discovery) (discovery.SvcDiscoveryRegistry, error) { switch discovery.Enable { case "k8s": return kubernetes.NewK8sDiscoveryRegister(discovery.RpcService.MessageGateway) - case "etcd": + case Etcd: return etcd.NewSvcDiscoveryRegistry( discovery.Etcd.RootDirectory, discovery.Etcd.Address, diff --git a/pkg/common/prommetrics/api.go b/pkg/common/prommetrics/api.go index 95b5c06b68..2dc5cb65d6 100644 --- a/pkg/common/prommetrics/api.go +++ b/pkg/common/prommetrics/api.go @@ -3,6 +3,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -23,14 +24,14 @@ var ( ) ) -func ApiInit(prometheusPort int) error { +func ApiInit(listener net.Listener) error { apiRegistry := prometheus.NewRegistry() cs := append( baseCollector, apiCounter, httpCounter, ) - return Init(apiRegistry, prometheusPort, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) + return Init(apiRegistry, listener, commonPath, promhttp.HandlerFor(apiRegistry, promhttp.HandlerOpts{}), cs...) } func APICall(path string, method string, apiCode int) { diff --git a/pkg/common/prommetrics/discovery.go b/pkg/common/prommetrics/discovery.go new file mode 100644 index 0000000000..8f03bc2ae1 --- /dev/null +++ b/pkg/common/prommetrics/discovery.go @@ -0,0 +1,31 @@ +package prommetrics + +import "fmt" + +const ( + APIKeyName = "api" + MessageTransferKeyName = "message-transfer" +) + +type Target struct { + Target string `json:"target"` + Labels map[string]string `json:"labels"` +} + +type RespTarget struct { + Targets []string `json:"targets"` + Labels map[string]string `json:"labels"` +} + +func BuildDiscoveryKey(name string) string { + return fmt.Sprintf("%s/%s/%s", "openim", "prometheus_discovery", name) +} + +func BuildDefaultTarget(host string, ip int) Target { + return Target{ + Target: fmt.Sprintf("%s:%d", host, ip), + Labels: map[string]string{ + "namespace": "default", + }, + } +} diff --git a/pkg/common/prommetrics/prommetrics.go b/pkg/common/prommetrics/prommetrics.go index 02e408d63b..2fc5d76b45 100644 --- a/pkg/common/prommetrics/prommetrics.go +++ b/pkg/common/prommetrics/prommetrics.go @@ -15,9 +15,9 @@ package prommetrics import ( - "fmt" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" + "net" "net/http" ) @@ -30,9 +30,9 @@ var ( } ) -func Init(registry *prometheus.Registry, prometheusPort int, path string, handler http.Handler, cs ...prometheus.Collector) error { +func Init(registry *prometheus.Registry, listener net.Listener, path string, handler http.Handler, cs ...prometheus.Collector) error { registry.MustRegister(cs...) srv := http.NewServeMux() srv.Handle(path, handler) - return http.ListenAndServe(fmt.Sprintf(":%d", prometheusPort), srv) + return http.Serve(listener, srv) } diff --git a/pkg/common/prommetrics/rpc.go b/pkg/common/prommetrics/rpc.go index 809d509b28..3f115d30bc 100644 --- a/pkg/common/prommetrics/rpc.go +++ b/pkg/common/prommetrics/rpc.go @@ -5,6 +5,7 @@ import ( "github.com/openimsdk/open-im-server/v3/pkg/common/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" "strconv" ) @@ -21,13 +22,13 @@ var ( ) ) -func RpcInit(cs []prometheus.Collector, prometheusPort int) error { +func RpcInit(cs []prometheus.Collector, listener net.Listener) error { reg := prometheus.NewRegistry() cs = append(append( baseCollector, rpcCounter, ), cs...) - return Init(reg, prometheusPort, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, rpcPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } func RPCCall(name string, path string, code int) { diff --git a/pkg/common/prommetrics/transfer.go b/pkg/common/prommetrics/transfer.go index f0abb82856..36fe1d5685 100644 --- a/pkg/common/prommetrics/transfer.go +++ b/pkg/common/prommetrics/transfer.go @@ -17,6 +17,7 @@ package prommetrics import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" + "net" ) var ( @@ -42,7 +43,7 @@ var ( }) ) -func TransferInit(prometheusPort int) error { +func TransferInit(listener net.Listener) error { reg := prometheus.NewRegistry() cs := append( baseCollector, @@ -52,5 +53,5 @@ func TransferInit(prometheusPort int) error { MsgInsertMongoFailedCounter, SeqSetFailedCounter, ) - return Init(reg, prometheusPort, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) + return Init(reg, listener, commonPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}), cs...) } diff --git a/pkg/common/startrpc/start.go b/pkg/common/startrpc/start.go index 7671b17360..30386acac9 100644 --- a/pkg/common/startrpc/start.go +++ b/pkg/common/startrpc/start.go @@ -19,7 +19,9 @@ import ( "errors" "fmt" "github.com/openimsdk/open-im-server/v3/pkg/common/config" + "github.com/openimsdk/tools/discovery/etcd" "github.com/openimsdk/tools/utils/datautil" + "github.com/openimsdk/tools/utils/jsonutil" "google.golang.org/grpc/status" "net" "net/http" @@ -46,11 +48,17 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo config T, client discovery.SvcDiscoveryRegistry, server *grpc.Server) error, options ...grpc.ServerOption) error { var ( - rpcTcpAddr string - netDone = make(chan struct{}, 2) - netErr error + rpcTcpAddr string + netDone = make(chan struct{}, 2) + netErr error + prometheusPort int ) + registerIP, err := network.GetRpcRegisterIP(registerIP) + if err != nil { + return err + } + if !autoSetPorts { rpcPort, err := datautil.GetElemByIndex(rpcPorts, index) if err != nil { @@ -61,6 +69,27 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo rpcTcpAddr = net.JoinHostPort(network.GetListenIP(listenIP), "0") } + getAutoPort := func() (net.Listener, int, error) { + listener, err := net.Listen("tcp", rpcTcpAddr) + if err != nil { + return nil, 0, errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } + _, portStr, _ := net.SplitHostPort(listener.Addr().String()) + port, _ := strconv.Atoi(portStr) + return listener, port, nil + } + + if autoSetPorts && discovery.Enable != kdisc.Etcd { + return errs.New("only etcd support autoSetPorts", "rpcRegisterName", rpcRegisterName).Wrap() + } + client, err := kdisc.NewDiscoveryRegister(discovery) + if err != nil { + return err + } + + defer client.Close() + client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + // var reg *prometheus.Registry // var metric *grpcprometheus.ServerMetrics if prometheusConfig.Enable { @@ -73,17 +102,41 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo prommetricsUnaryInterceptor(rpcRegisterName), prommetricsStreamInterceptor(rpcRegisterName), ) - prometheusPort, err := datautil.GetElemByIndex(prometheusConfig.Ports, index) - if err != nil { - return err + + var ( + listener net.Listener + ) + + if autoSetPorts { + listener, prometheusPort, err = getAutoPort() + if err != nil { + return err + } + + etcdClient := client.(*etcd.SvcDiscoveryRegistryImpl).GetClient() + + _, err = etcdClient.Put(ctx, prommetrics.BuildDiscoveryKey(rpcRegisterName), jsonutil.StructToJsonString(prommetrics.BuildDefaultTarget(registerIP, prometheusPort))) + if err != nil { + return errs.WrapMsg(err, "etcd put err") + } + } else { + prometheusPort, err = datautil.GetElemByIndex(prometheusConfig.Ports, index) + if err != nil { + return err + } + listener, err = net.Listen("tcp", fmt.Sprintf(":%d", prometheusPort)) + if err != nil { + return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) + } } + cs := prommetrics.GetGrpcCusMetrics(rpcRegisterName, discovery) go func() { - if err := prommetrics.RpcInit(cs, prometheusPort); err != nil && !errors.Is(err, http.ErrServerClosed) { + if err := prommetrics.RpcInit(cs, listener); err != nil && !errors.Is(err, http.ErrServerClosed) { netErr = errs.WrapMsg(err, fmt.Sprintf("rpc %s prometheus start err: %d", rpcRegisterName, prometheusPort)) netDone <- struct{}{} } - // metric.InitializeMetrics(srv) + //metric.InitializeMetrics(srv) // Create a HTTP server for prometheus. // httpServer = &http.Server{Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}), Addr: fmt.Sprintf("0.0.0.0:%d", prometheusPort)} // if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -95,30 +148,15 @@ func Start[T any](ctx context.Context, discovery *config.Discovery, prometheusCo options = append(options, mw.GrpcServer()) } - listener, err := net.Listen( - "tcp", - rpcTcpAddr, - ) - if err != nil { - return errs.WrapMsg(err, "listen err", "rpcTcpAddr", rpcTcpAddr) - } - - _, portStr, _ := net.SplitHostPort(listener.Addr().String()) - registerIP = network.GetListenIP(registerIP) - port, _ := strconv.Atoi(portStr) - - log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", portStr, - "prometheusPorts", prometheusConfig.Ports) - - defer listener.Close() - client, err := kdisc.NewDiscoveryRegister(discovery) + listener, port, err := getAutoPort() if err != nil { return err } - defer client.Close() - client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin"))) + log.CInfo(ctx, "RPC server is initializing", "rpcRegisterName", rpcRegisterName, "rpcPort", port, + "prometheusPort", prometheusPort) + defer listener.Close() srv := grpc.NewServer(options...) err = rpcFn(ctx, config, client, srv)