Skip to content

Commit

Permalink
cmd/sync: add p8s metrics for sync command (#4119)
Browse files Browse the repository at this point in the history
depend on juicedata/mpb#2
close #4109
  • Loading branch information
zhijian-pro authored Oct 25, 2023
1 parent 9a0cbbf commit 86e69e4
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 40 deletions.
14 changes: 9 additions & 5 deletions cmd/mdtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"sync"
"time"

"github.com/juicedata/juicefs/pkg/utils"
"github.com/mattn/go-isatty"

"github.com/juicedata/juicefs/pkg/chunk"
"github.com/juicedata/juicefs/pkg/fs"
"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/metric"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juicedata/juicefs/pkg/vfs"
"github.com/mattn/go-isatty"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -221,9 +221,13 @@ func initForMdtest(c *cli.Context, mp string, metaUrl string) *fs.FileSystem {
conf.EntryTimeout = time.Millisecond * time.Duration(c.Float64("entry-cache")*1000)
conf.DirEntryTimeout = time.Millisecond * time.Duration(c.Float64("dir-entry-cache")*1000)

metricsAddr := exposeMetrics(c, m, registerer, registry)
metricsAddr := exposeMetrics(c, registerer, registry)
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
if c.IsSet("consul") {
metric.RegisterToConsul(c.String("consul"), metricsAddr, conf.Meta.MountPoint)
metadata := make(map[string]string)
metadata["mountPoint"] = conf.Meta.MountPoint
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
}
jfs, err := fs.NewFileSystem(conf, m, store)
if err != nil {
Expand Down
15 changes: 8 additions & 7 deletions cmd/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,17 +103,14 @@ func installHandler(mp string) {
}()
}

func exposeMetrics(c *cli.Context, m meta.Meta, registerer prometheus.Registerer, registry *prometheus.Registry) string {
func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string {
var ip, port string
//default set
ip, port, err := net.SplitHostPort(c.String("metrics"))
if err != nil {
logger.Fatalf("metrics format error: %v", err)
}

m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
go metric.UpdateMetrics(m, registerer)
go metric.UpdateMetrics(registerer)
http.Handle("/metrics", promhttp.HandlerFor(
registry,
promhttp.HandlerOpts{
Expand Down Expand Up @@ -424,10 +421,14 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config {
}

func initBackgroundTasks(c *cli.Context, vfsConf *vfs.Config, metaConf *meta.Config, m meta.Meta, blob object.ObjectStorage, registerer prometheus.Registerer, registry *prometheus.Registry) {
metricsAddr := exposeMetrics(c, m, registerer, registry)
metricsAddr := exposeMetrics(c, registerer, registry)
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
vfsConf.Port.PrometheusAgent = metricsAddr
if c.IsSet("consul") {
metric.RegisterToConsul(c.String("consul"), metricsAddr, vfsConf.Meta.MountPoint)
metadata := make(map[string]string)
metadata["mountPoint"] = vfsConf.Meta.MountPoint
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
vfsConf.Port.ConsulAddr = c.String("consul")
}
if !metaConf.ReadOnly && !metaConf.NoBGJob && vfsConf.BackupMeta > 0 {
Expand Down
5 changes: 3 additions & 2 deletions cmd/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ func Test_exposeMetrics(t *testing.T) {
defer isSetPatches.Reset()
ResetHttp()
registerer, registry := wrapRegister("test", "test")
metricsAddr := exposeMetrics(appCtx, client, registerer, registry)

metricsAddr := exposeMetrics(appCtx, registerer, registry)
client.InitMetrics(registerer)
vfs.InitMetrics(registerer)
u := url.URL{Scheme: "http", Host: metricsAddr, Path: "/metrics"}
resp, err := http.Get(u.String())
So(err, ShouldBeNil)
Expand Down
48 changes: 41 additions & 7 deletions cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ import (
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"

"github.com/juicedata/juicefs/pkg/metric"
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/sync"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -78,14 +83,18 @@ Supported storage systems: https://juicefs.com/docs/community/how_to_setup_objec
syncActionFlags(),
syncStorageFlags(),
clusterFlags(),
[]cli.Flag{
&cli.IntFlag{
Name: "http-port",
Value: 6070,
Hidden: true,
Usage: "HTTP `PORT` to listen to",
addCategories("METRICS", []cli.Flag{
&cli.StringFlag{
Name: "metrics",
Value: "127.0.0.1:9567",
Usage: "address to export metrics",
},
},
&cli.StringFlag{
Name: "consul",
Value: "127.0.0.1:8500",
Usage: "consul address to register",
},
}),
),
}
}
Expand Down Expand Up @@ -423,5 +432,30 @@ func doSync(c *cli.Context) error {
os.SetStorageClass(config.StorageClass)
}
}

if config.Manager == "" && !config.Dry {
var srcPath, dstPath string
if strings.HasPrefix(src.String(), "file://") {
srcPath = src.String()
}
if strings.HasPrefix(dst.String(), "file://") {
dstPath = dst.String()
}
srcPath = utils.RemovePassword(srcPath)
dstPath = utils.RemovePassword(dstPath)
registry := prometheus.NewRegistry()
config.Registerer = prometheus.WrapRegistererWithPrefix("juicefs_sync_",
prometheus.WrapRegistererWith(prometheus.Labels{"cmd": "sync", "pid": strconv.Itoa(os.Getpid())}, registry))
config.Registerer.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
config.Registerer.MustRegister(collectors.NewGoCollector())
metricsAddr := exposeMetrics(c, config.Registerer, registry)
if c.IsSet("consul") {
metadata := make(map[string]string)
metadata["src"] = srcPath
metadata["dst"] = dstPath
metadata["pid"] = strconv.Itoa(os.Getpid())
metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata)
}
}
return sync.Sync(src, dst, config)
}
9 changes: 3 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ require (
xorm.io/xorm v1.0.7
)

require (
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
)

require (
cloud.google.com/go v0.102.1 // indirect
cloud.google.com/go/iam v0.3.0 // indirect
Expand Down Expand Up @@ -205,6 +200,7 @@ require (
github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/pyroscope-io/godeltaprof v0.1.2 // indirect
github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
Expand All @@ -231,6 +227,7 @@ require (
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a // indirect
github.com/willf/bitset v1.1.11 // indirect
github.com/willf/bloom v2.0.3+incompatible // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.etcd.io/etcd/api/v3 v3.5.9 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect
go.opencensus.io v0.23.0 // indirect
Expand All @@ -254,7 +251,7 @@ replace github.com/hanwen/go-fuse/v2 v2.1.1-0.20210611132105-24a1dfe6b4f8 => git

replace github.com/dgrijalva/jwt-go v3.2.0+incompatible => github.com/golang-jwt/jwt v3.2.1+incompatible

replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba
replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b

replace google.golang.org/grpc v1.43.0 => google.golang.org/grpc v1.29.0

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,8 @@ github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c0
github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE=
github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c h1:w+4eiZLSLd6aQcy+7wn++hI1caDAm+rNOG7Me5qO7Sw=
github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c/go.mod h1:8oMBmyEWA8aYwMwO7eUNOjvVNOhDeqDlio2RSv6T/4Q=
github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba h1:YSCPvyONPDp/ivKgRanFpNEHh3N5/0UsKmwbbKQIuGE=
github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b h1:0/6suPNZnrOlRlBaU/Bnitu8HiKkkLSzQhHbwQ9AysM=
github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk=
github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI=
github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
Expand Down
27 changes: 17 additions & 10 deletions pkg/metric/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"

"github.com/juicedata/juicefs/pkg/meta"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -57,7 +55,7 @@ var (
})
)

func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) {
func UpdateMetrics(registerer prometheus.Registerer) {
if registerer == nil {
return
}
Expand All @@ -66,7 +64,7 @@ func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) {
registerer.MustRegister(uptime)
}

func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
func RegisterToConsul(consulAddr, metricsAddr string, metadata map[string]string) {
if metricsAddr == "" {
logger.Errorf("Metrics server start err,so can't register to consul")
return
Expand Down Expand Up @@ -101,14 +99,23 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
return
}

localMeta := make(map[string]string)
hostname, err := os.Hostname()
if err != nil {
logger.Errorf("Get hostname failed:%s", err)
return
}
localMeta["hostName"] = hostname
localMeta["mountPoint"] = mountPoint
metadata["hostName"] = hostname
var id, name string
if mp, ok := metadata["mountPoint"]; ok {
id = fmt.Sprintf("%s:%s", localIp, mp)
name = "juicefs"
} else {
// for sync metrics, id format: 127.0.0.1;src->dst;pid=6666
id = fmt.Sprintf("%s;%s->%s;pid=%s", localIp, metadata["src"], metadata["dst"], metadata["pid"])
delete(metadata, "src")
delete(metadata, "dst")
name = "juicefs-sync"
}

check := &consulapi.AgentServiceCheck{
HTTP: fmt.Sprintf("http://%s:%d/metrics", localIp, port),
Expand All @@ -118,11 +125,11 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) {
}

registration := consulapi.AgentServiceRegistration{
ID: fmt.Sprintf("%s:%s", localIp, mountPoint),
Name: "juicefs",
ID: id,
Name: name,
Port: port,
Address: localIp,
Meta: localMeta,
Meta: metadata,
Check: check,
}
if err = client.Agent().ServiceRegister(&registration); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sync/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"strings"

"github.com/prometheus/client_golang/prometheus"
"github.com/urfave/cli/v2"
)

Expand Down Expand Up @@ -57,6 +58,7 @@ type Config struct {

rules []rule
concurrentList chan int
Registerer prometheus.Registerer
}

func envList() []string {
Expand Down
76 changes: 76 additions & 0 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/juicedata/juicefs/pkg/object"
"github.com/juicedata/juicefs/pkg/utils"
"github.com/juju/ratelimit"
"github.com/prometheus/client_golang/prometheus"
)

// The max number of key per listing request
Expand Down Expand Up @@ -1032,6 +1033,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
}
}()

initSyncMetrics(config)
for i := 0; i < config.Threads; i++ {
wg.Add(1)
go func() {
Expand Down Expand Up @@ -1108,3 +1110,77 @@ func Sync(src, dst object.ObjectStorage, config *Config) error {
}
return nil
}

func initSyncMetrics(config *Config) {
if config.Registerer != nil {
config.Registerer.MustRegister(
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "scanned",
Help: "Scanned objects",
}, func() float64 {
return float64(handled.Total())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "handled",
Help: "Handled objects",
}, func() float64 {
return float64(handled.Current())
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "pending",
Help: "Pending objects",
}, func() float64 {
return float64(pending.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "copied",
Help: "Copied objects",
}, func() float64 {
return float64(copied.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "copied_bytes",
Help: "Copied bytes",
}, func() float64 {
return float64(copiedBytes.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "skipped",
Help: "Skipped objects",
}, func() float64 {
return float64(skipped.Current())
}),
)
if failed != nil {
config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "failed",
Help: "Failed objects",
}, func() float64 {
return float64(failed.Current())
}))
}
if deleted != nil {
config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "deleted",
Help: "Deleted objects",
}, func() float64 {
return float64(deleted.Current())
}))
}
if checked != nil && checkedBytes != nil {
config.Registerer.MustRegister(
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "checked",
Help: "Checked objects",
}, func() float64 {
return float64(checked.Current())
}),
prometheus.NewCounterFunc(prometheus.CounterOpts{
Name: "checked_bytes",
Help: "Checked bytes",
}, func() float64 {
return float64(checkedBytes.Current())
}))
}
}
}
2 changes: 1 addition & 1 deletion sdk/java/libjfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp
}
m.InitMetrics(registerer)
vfs.InitMetrics(registerer)
go metric.UpdateMetrics(m, registerer)
go metric.UpdateMetrics(registerer)
}

blob, err := cmd.NewReloadableStorage(format, m, func(f *meta.Format) {
Expand Down

0 comments on commit 86e69e4

Please sign in to comment.