diff --git a/cmd/mdtest.go b/cmd/mdtest.go index 9db5c0551336..37c2a6dff898 100644 --- a/cmd/mdtest.go +++ b/cmd/mdtest.go @@ -25,13 +25,13 @@ import ( "sync" "time" - "github.com/juicedata/juicefs/pkg/utils" - "github.com/mattn/go-isatty" - "github.com/juicedata/juicefs/pkg/chunk" "github.com/juicedata/juicefs/pkg/fs" "github.com/juicedata/juicefs/pkg/meta" "github.com/juicedata/juicefs/pkg/metric" + "github.com/juicedata/juicefs/pkg/utils" + "github.com/juicedata/juicefs/pkg/vfs" + "github.com/mattn/go-isatty" "github.com/urfave/cli/v2" ) @@ -221,9 +221,13 @@ func initForMdtest(c *cli.Context, mp string, metaUrl string) *fs.FileSystem { conf.EntryTimeout = time.Millisecond * time.Duration(c.Float64("entry-cache")*1000) conf.DirEntryTimeout = time.Millisecond * time.Duration(c.Float64("dir-entry-cache")*1000) - metricsAddr := exposeMetrics(c, m, registerer, registry) + metricsAddr := exposeMetrics(c, registerer, registry) + m.InitMetrics(registerer) + vfs.InitMetrics(registerer) if c.IsSet("consul") { - metric.RegisterToConsul(c.String("consul"), metricsAddr, conf.Meta.MountPoint) + metadata := make(map[string]string) + metadata["mountPoint"] = conf.Meta.MountPoint + metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata) } jfs, err := fs.NewFileSystem(conf, m, store) if err != nil { diff --git a/cmd/mount.go b/cmd/mount.go index dbaf8b98ab6c..1e79cb5bd2f7 100644 --- a/cmd/mount.go +++ b/cmd/mount.go @@ -103,17 +103,14 @@ func installHandler(mp string) { }() } -func exposeMetrics(c *cli.Context, m meta.Meta, registerer prometheus.Registerer, registry *prometheus.Registry) string { +func exposeMetrics(c *cli.Context, registerer prometheus.Registerer, registry *prometheus.Registry) string { var ip, port string //default set ip, port, err := net.SplitHostPort(c.String("metrics")) if err != nil { logger.Fatalf("metrics format error: %v", err) } - - m.InitMetrics(registerer) - vfs.InitMetrics(registerer) - go metric.UpdateMetrics(m, registerer) + go metric.UpdateMetrics(registerer) http.Handle("/metrics", promhttp.HandlerFor( registry, promhttp.HandlerOpts{ @@ -424,10 +421,14 @@ func getChunkConf(c *cli.Context, format *meta.Format) *chunk.Config { } func initBackgroundTasks(c *cli.Context, vfsConf *vfs.Config, metaConf *meta.Config, m meta.Meta, blob object.ObjectStorage, registerer prometheus.Registerer, registry *prometheus.Registry) { - metricsAddr := exposeMetrics(c, m, registerer, registry) + metricsAddr := exposeMetrics(c, registerer, registry) + m.InitMetrics(registerer) + vfs.InitMetrics(registerer) vfsConf.Port.PrometheusAgent = metricsAddr if c.IsSet("consul") { - metric.RegisterToConsul(c.String("consul"), metricsAddr, vfsConf.Meta.MountPoint) + metadata := make(map[string]string) + metadata["mountPoint"] = vfsConf.Meta.MountPoint + metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata) vfsConf.Port.ConsulAddr = c.String("consul") } if !metaConf.ReadOnly && !metaConf.NoBGJob && vfsConf.BackupMeta > 0 { diff --git a/cmd/mount_test.go b/cmd/mount_test.go index 2bc66792b11b..a40d0f710250 100644 --- a/cmd/mount_test.go +++ b/cmd/mount_test.go @@ -73,8 +73,9 @@ func Test_exposeMetrics(t *testing.T) { defer isSetPatches.Reset() ResetHttp() registerer, registry := wrapRegister("test", "test") - metricsAddr := exposeMetrics(appCtx, client, registerer, registry) - + metricsAddr := exposeMetrics(appCtx, registerer, registry) + client.InitMetrics(registerer) + vfs.InitMetrics(registerer) u := url.URL{Scheme: "http", Host: metricsAddr, Path: "/metrics"} resp, err := http.Get(u.String()) So(err, ShouldBeNil) diff --git a/cmd/sync.go b/cmd/sync.go index ca543a148db8..553ad33e5ec2 100644 --- a/cmd/sync.go +++ b/cmd/sync.go @@ -25,10 +25,15 @@ import ( "path/filepath" "regexp" "runtime" + "strconv" "strings" + "github.com/juicedata/juicefs/pkg/metric" "github.com/juicedata/juicefs/pkg/object" "github.com/juicedata/juicefs/pkg/sync" + "github.com/juicedata/juicefs/pkg/utils" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/collectors" "github.com/urfave/cli/v2" ) @@ -78,14 +83,18 @@ Supported storage systems: https://juicefs.com/docs/community/how_to_setup_objec syncActionFlags(), syncStorageFlags(), clusterFlags(), - []cli.Flag{ - &cli.IntFlag{ - Name: "http-port", - Value: 6070, - Hidden: true, - Usage: "HTTP `PORT` to listen to", + addCategories("METRICS", []cli.Flag{ + &cli.StringFlag{ + Name: "metrics", + Value: "127.0.0.1:9567", + Usage: "address to export metrics", }, - }, + &cli.StringFlag{ + Name: "consul", + Value: "127.0.0.1:8500", + Usage: "consul address to register", + }, + }), ), } } @@ -423,5 +432,30 @@ func doSync(c *cli.Context) error { os.SetStorageClass(config.StorageClass) } } + + if config.Manager == "" && !config.Dry { + var srcPath, dstPath string + if strings.HasPrefix(src.String(), "file://") { + srcPath = src.String() + } + if strings.HasPrefix(dst.String(), "file://") { + dstPath = dst.String() + } + srcPath = utils.RemovePassword(srcPath) + dstPath = utils.RemovePassword(dstPath) + registry := prometheus.NewRegistry() + config.Registerer = prometheus.WrapRegistererWithPrefix("juicefs_sync_", + prometheus.WrapRegistererWith(prometheus.Labels{"cmd": "sync", "pid": strconv.Itoa(os.Getpid())}, registry)) + config.Registerer.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + config.Registerer.MustRegister(collectors.NewGoCollector()) + metricsAddr := exposeMetrics(c, config.Registerer, registry) + if c.IsSet("consul") { + metadata := make(map[string]string) + metadata["src"] = srcPath + metadata["dst"] = dstPath + metadata["pid"] = strconv.Itoa(os.Getpid()) + metric.RegisterToConsul(c.String("consul"), metricsAddr, metadata) + } + } return sync.Sync(src, dst, config) } diff --git a/go.mod b/go.mod index daade1ce3a36..3935d8a93a5c 100644 --- a/go.mod +++ b/go.mod @@ -81,11 +81,6 @@ require ( xorm.io/xorm v1.0.7 ) -require ( - github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect - github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect -) - require ( cloud.google.com/go v0.102.1 // indirect cloud.google.com/go/iam v0.3.0 // indirect @@ -205,6 +200,7 @@ require ( github.com/pquerna/ffjson v0.0.0-20190930134022-aa0246cd15f7 // indirect github.com/prometheus/procfs v0.9.0 // indirect github.com/pyroscope-io/godeltaprof v0.1.2 // indirect + github.com/rasky/go-xdr v0.0.0-20170124162913-1a41d1a06c93 // indirect github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rjeczalik/notify v0.9.2 // indirect @@ -231,6 +227,7 @@ require ( github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a // indirect github.com/willf/bitset v1.1.11 // indirect github.com/willf/bloom v2.0.3+incompatible // indirect + github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect go.etcd.io/etcd/api/v3 v3.5.9 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.9 // indirect go.opencensus.io v0.23.0 // indirect @@ -254,7 +251,7 @@ replace github.com/hanwen/go-fuse/v2 v2.1.1-0.20210611132105-24a1dfe6b4f8 => git replace github.com/dgrijalva/jwt-go v3.2.0+incompatible => github.com/golang-jwt/jwt v3.2.1+incompatible -replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba +replace github.com/vbauerster/mpb/v7 v7.0.3 => github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b replace google.golang.org/grpc v1.43.0 => google.golang.org/grpc v1.29.0 diff --git a/go.sum b/go.sum index 199e8d6355e6..8ee854756fb6 100644 --- a/go.sum +++ b/go.sum @@ -634,8 +634,8 @@ github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c0 github.com/juicedata/huaweicloud-sdk-go-obs v3.22.12-0.20230228031208-386e87b5c091+incompatible/go.mod h1:Ukwa8ffRQLV6QRwpqGioPjn2Wnf7TBDA4DbennDOqHE= github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c h1:w+4eiZLSLd6aQcy+7wn++hI1caDAm+rNOG7Me5qO7Sw= github.com/juicedata/minio v0.0.0-20221113011458-8866d5c9df8c/go.mod h1:8oMBmyEWA8aYwMwO7eUNOjvVNOhDeqDlio2RSv6T/4Q= -github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba h1:YSCPvyONPDp/ivKgRanFpNEHh3N5/0UsKmwbbKQIuGE= -github.com/juicedata/mpb/v7 v7.0.4-0.20220719014258-68df1356cfba/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk= +github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b h1:0/6suPNZnrOlRlBaU/Bnitu8HiKkkLSzQhHbwQ9AysM= +github.com/juicedata/mpb/v7 v7.0.4-0.20231024073412-2b8d31be510b/go.mod h1:NXGsfPGx6G2JssqvEcULtDqUrxuuYs4llpv8W6ZUpzk= github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI= github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= diff --git a/pkg/metric/metrics.go b/pkg/metric/metrics.go index fb785232d452..3496105a0969 100644 --- a/pkg/metric/metrics.go +++ b/pkg/metric/metrics.go @@ -25,8 +25,6 @@ import ( consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/go-hclog" - - "github.com/juicedata/juicefs/pkg/meta" "github.com/juicedata/juicefs/pkg/utils" "github.com/prometheus/client_golang/prometheus" ) @@ -57,7 +55,7 @@ var ( }) ) -func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) { +func UpdateMetrics(registerer prometheus.Registerer) { if registerer == nil { return } @@ -66,7 +64,7 @@ func UpdateMetrics(m meta.Meta, registerer prometheus.Registerer) { registerer.MustRegister(uptime) } -func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) { +func RegisterToConsul(consulAddr, metricsAddr string, metadata map[string]string) { if metricsAddr == "" { logger.Errorf("Metrics server start err,so can't register to consul") return @@ -101,14 +99,23 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) { return } - localMeta := make(map[string]string) hostname, err := os.Hostname() if err != nil { logger.Errorf("Get hostname failed:%s", err) return } - localMeta["hostName"] = hostname - localMeta["mountPoint"] = mountPoint + metadata["hostName"] = hostname + var id, name string + if mp, ok := metadata["mountPoint"]; ok { + id = fmt.Sprintf("%s:%s", localIp, mp) + name = "juicefs" + } else { + // for sync metrics, id format: 127.0.0.1;src->dst;pid=6666 + id = fmt.Sprintf("%s;%s->%s;pid=%s", localIp, metadata["src"], metadata["dst"], metadata["pid"]) + delete(metadata, "src") + delete(metadata, "dst") + name = "juicefs-sync" + } check := &consulapi.AgentServiceCheck{ HTTP: fmt.Sprintf("http://%s:%d/metrics", localIp, port), @@ -118,11 +125,11 @@ func RegisterToConsul(consulAddr, metricsAddr, mountPoint string) { } registration := consulapi.AgentServiceRegistration{ - ID: fmt.Sprintf("%s:%s", localIp, mountPoint), - Name: "juicefs", + ID: id, + Name: name, Port: port, Address: localIp, - Meta: localMeta, + Meta: metadata, Check: check, } if err = client.Agent().ServiceRegister(®istration); err != nil { diff --git a/pkg/sync/config.go b/pkg/sync/config.go index a3900cb3c7fe..70a5ea6b1ec4 100644 --- a/pkg/sync/config.go +++ b/pkg/sync/config.go @@ -20,6 +20,7 @@ import ( "os" "strings" + "github.com/prometheus/client_golang/prometheus" "github.com/urfave/cli/v2" ) @@ -57,6 +58,7 @@ type Config struct { rules []rule concurrentList chan int + Registerer prometheus.Registerer } func envList() []string { diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index a2a73d972faf..da737f274aff 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -31,6 +31,7 @@ import ( "github.com/juicedata/juicefs/pkg/object" "github.com/juicedata/juicefs/pkg/utils" "github.com/juju/ratelimit" + "github.com/prometheus/client_golang/prometheus" ) // The max number of key per listing request @@ -1032,6 +1033,7 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { } }() + initSyncMetrics(config) for i := 0; i < config.Threads; i++ { wg.Add(1) go func() { @@ -1108,3 +1110,77 @@ func Sync(src, dst object.ObjectStorage, config *Config) error { } return nil } + +func initSyncMetrics(config *Config) { + if config.Registerer != nil { + config.Registerer.MustRegister( + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "scanned", + Help: "Scanned objects", + }, func() float64 { + return float64(handled.Total()) + }), + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "handled", + Help: "Handled objects", + }, func() float64 { + return float64(handled.Current()) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Name: "pending", + Help: "Pending objects", + }, func() float64 { + return float64(pending.Current()) + }), + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "copied", + Help: "Copied objects", + }, func() float64 { + return float64(copied.Current()) + }), + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "copied_bytes", + Help: "Copied bytes", + }, func() float64 { + return float64(copiedBytes.Current()) + }), + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "skipped", + Help: "Skipped objects", + }, func() float64 { + return float64(skipped.Current()) + }), + ) + if failed != nil { + config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "failed", + Help: "Failed objects", + }, func() float64 { + return float64(failed.Current()) + })) + } + if deleted != nil { + config.Registerer.MustRegister(prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "deleted", + Help: "Deleted objects", + }, func() float64 { + return float64(deleted.Current()) + })) + } + if checked != nil && checkedBytes != nil { + config.Registerer.MustRegister( + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "checked", + Help: "Checked objects", + }, func() float64 { + return float64(checked.Current()) + }), + prometheus.NewCounterFunc(prometheus.CounterOpts{ + Name: "checked_bytes", + Help: "Checked bytes", + }, func() float64 { + return float64(checkedBytes.Current()) + })) + } + } +} diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index ef638882bd06..8a88079a587e 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -471,7 +471,7 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp } m.InitMetrics(registerer) vfs.InitMetrics(registerer) - go metric.UpdateMetrics(m, registerer) + go metric.UpdateMetrics(registerer) } blob, err := cmd.NewReloadableStorage(format, m, func(f *meta.Format) {