From c3875336e9decb09e2510c0dd00ccc72d9f7af7a Mon Sep 17 00:00:00 2001 From: Injun Song Date: Fri, 23 Feb 2024 23:06:37 +0900 Subject: [PATCH] feat(benchmark): use gops to diagnose benchmark tool This pull request adds the gops agent into the benchmark tool, making it easier to diagnose. Refs: - https://github.com/google/gops --- cmd/benchmark/test_command.go | 28 +- go.mod | 1 + go.sum | 2 + internal/benchmark/loader.go | 5 +- vendor/github.com/google/gops/LICENSE | 27 ++ vendor/github.com/google/gops/agent/agent.go | 284 ++++++++++++++++++ .../google/gops/agent/sockopt_reuseport.go | 37 +++ .../google/gops/agent/sockopt_unsupported.go | 14 + .../google/gops/internal/internal.go | 61 ++++ .../github.com/google/gops/signal/signal.go | 38 +++ vendor/modules.txt | 5 + 11 files changed, 493 insertions(+), 9 deletions(-) create mode 100644 vendor/github.com/google/gops/LICENSE create mode 100644 vendor/github.com/google/gops/agent/agent.go create mode 100644 vendor/github.com/google/gops/agent/sockopt_reuseport.go create mode 100644 vendor/github.com/google/gops/agent/sockopt_unsupported.go create mode 100644 vendor/github.com/google/gops/internal/internal.go create mode 100644 vendor/github.com/google/gops/signal/signal.go diff --git a/cmd/benchmark/test_command.go b/cmd/benchmark/test_command.go index 3e61264ff..275e1d137 100644 --- a/cmd/benchmark/test_command.go +++ b/cmd/benchmark/test_command.go @@ -4,6 +4,7 @@ import ( "fmt" "strings" + "github.com/google/gops/agent" "github.com/urfave/cli/v2" "github.com/kakao/varlog/internal/benchmark" @@ -45,6 +46,12 @@ var ( Category: "Common: ", Usage: "Use single connection shared by appenders in a target. Each target uses different connection.", } + flagGopsAddr = &cli.StringFlag{ + Name: "gops-addr", + Category: "Common: ", + Value: ":0", + Usage: "The address of gops agent", + } flagAppenders = &cli.UintSliceFlag{ Name: "appenders", @@ -94,17 +101,21 @@ func newCommandTest() *cli.Command { Usage: "run benchmark test", Flags: []cli.Flag{ flagClusterID, + flagTarget, flagMRAddrs, - flagMsgSize, - flagBatchSize, - flagAppenders, - flagSubscribers, flagDuration, flagReportInterval, flagPrintJSON, - flagPipelineSize, flagSingleConnPerTarget, + flagGopsAddr, + + flagAppenders, + flagMsgSize, + flagBatchSize, + flagPipelineSize, + + flagSubscribers, flagSubscribeSize, }, Action: runCommandTest, @@ -116,6 +127,13 @@ func runCommandTest(c *cli.Context) error { return fmt.Errorf("unexpected args: %v", c.Args().Slice()) } + if err := agent.Listen(agent.Options{ + Addr: c.String(flagGopsAddr.Name), + }); err != nil { + return err + } + defer agent.Close() + clusterID, err := types.ParseClusterID(c.String(flagClusterID.Name)) if err != nil { return err diff --git a/go.mod b/go.mod index c7721c484..e782e959c 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/gogo/status v1.1.1 github.com/golang/protobuf v1.5.3 + github.com/google/gops v0.3.28 github.com/lib/pq v1.10.9 github.com/pkg/errors v0.9.1 github.com/puzpuzpuz/xsync/v2 v2.5.1 diff --git a/go.sum b/go.sum index 1ec15f4eb..8a56e4498 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gops v0.3.28 h1:2Xr57tqKAmQYRAfG12E+yLcoa2Y42UJo2lOrUFL9ark= +github.com/google/gops v0.3.28/go.mod h1:6f6+Nl8LcHrzJwi8+p0ii+vmBFSlB4f8cOOkTJ7sk4c= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= diff --git a/internal/benchmark/loader.go b/internal/benchmark/loader.go index e1b83ecdf..57356081b 100644 --- a/internal/benchmark/loader.go +++ b/internal/benchmark/loader.go @@ -248,11 +248,8 @@ func (loader *Loader) subscribeLoop(ctx context.Context, c varlog.Log) error { } func (loader *Loader) subscribeLoopInternal(ctx context.Context, c varlog.Log, first, last varlogpb.LogSequenceNumber) error { - subscribeSize := types.GLSN(loader.SubscribeSize) - - loader.logger.Info("subscribe", slog.Any("first", first), slog.Any("last", last), slog.Int("subscribeSize", loader.SubscribeSize)) - var sm SubscribeMetrics + subscribeSize := types.GLSN(loader.SubscribeSize) begin := first.GLSN end := min(begin+subscribeSize, last.GLSN+1) for begin < end { diff --git a/vendor/github.com/google/gops/LICENSE b/vendor/github.com/google/gops/LICENSE new file mode 100644 index 000000000..55e52a010 --- /dev/null +++ b/vendor/github.com/google/gops/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2016 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/github.com/google/gops/agent/agent.go b/vendor/github.com/google/gops/agent/agent.go new file mode 100644 index 000000000..b7978069d --- /dev/null +++ b/vendor/github.com/google/gops/agent/agent.go @@ -0,0 +1,284 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package agent provides hooks programs can register to retrieve +// diagnostics data by using gops. +package agent + +import ( + "bufio" + "context" + "encoding/binary" + "fmt" + "io" + "net" + "os" + gosignal "os/signal" + "path/filepath" + "runtime" + "runtime/debug" + "runtime/pprof" + "runtime/trace" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/google/gops/internal" + "github.com/google/gops/signal" +) + +const defaultAddr = "127.0.0.1:0" + +var ( + mu sync.Mutex + portfile string + listener net.Listener + + units = []string{" bytes", "KB", "MB", "GB", "TB", "PB"} +) + +// Options allows configuring the started agent. +type Options struct { + // Addr is the host:port the agent will be listening at. + // Optional. + Addr string + + // ConfigDir is the directory to store the configuration file, + // PID of the gops process, filename, port as well as content. + // Optional. + ConfigDir string + + // ShutdownCleanup automatically cleans up resources if the + // running process receives an interrupt. Otherwise, users + // can call Close before shutting down. + // Optional. + ShutdownCleanup bool + + // ReuseSocketAddrAndPort determines whether the SO_REUSEADDR and + // SO_REUSEPORT socket options should be set on the listening socket of + // the agent. This option is only effective on unix-like OSes and if + // Addr is set to a fixed host:port. + // Optional. + ReuseSocketAddrAndPort bool +} + +// Listen starts the gops agent on a host process. Once agent started, users +// can use the advanced gops features. The agent will listen to Interrupt +// signals and exit the process, if you need to perform further work on the +// Interrupt signal use the options parameter to configure the agent +// accordingly. +// +// Note: The agent exposes an endpoint via a TCP connection that can be used by +// any program on the system. Review your security requirements before starting +// the agent. +func Listen(opts Options) error { + mu.Lock() + defer mu.Unlock() + + if portfile != "" { + return fmt.Errorf("gops: agent already listening at: %v", listener.Addr()) + } + + // new + gopsdir := opts.ConfigDir + if gopsdir == "" { + cfgDir, err := internal.ConfigDir() + if err != nil { + return err + } + gopsdir = cfgDir + } + + err := os.MkdirAll(gopsdir, os.ModePerm) + if err != nil { + return err + } + if opts.ShutdownCleanup { + gracefulShutdown() + } + + addr := opts.Addr + if addr == "" { + addr = defaultAddr + } + var lc net.ListenConfig + if opts.ReuseSocketAddrAndPort { + lc.Control = setReuseAddrAndPortSockopts + } + listener, err = lc.Listen(context.Background(), "tcp", addr) + if err != nil { + return err + } + port := listener.Addr().(*net.TCPAddr).Port + portfile = filepath.Join(gopsdir, strconv.Itoa(os.Getpid())) + err = os.WriteFile(portfile, []byte(strconv.Itoa(port)), os.ModePerm) + if err != nil { + return err + } + + go listen(listener) + return nil +} + +func listen(l net.Listener) { + buf := make([]byte, 1) + for { + fd, err := l.Accept() + if err != nil { + // No great way to check for this, see https://golang.org/issues/4373. + if !strings.Contains(err.Error(), "use of closed network connection") { + fmt.Fprintf(os.Stderr, "gops: %v\n", err) + } + if netErr, ok := err.(net.Error); ok && !netErr.Temporary() { + break + } + continue + } + if _, err := fd.Read(buf); err != nil { + fmt.Fprintf(os.Stderr, "gops: %v\n", err) + continue + } + if err := handle(fd, buf); err != nil { + fmt.Fprintf(os.Stderr, "gops: %v\n", err) + continue + } + fd.Close() + } +} + +func gracefulShutdown() { + c := make(chan os.Signal, 1) + gosignal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + go func() { + // cleanup the socket on shutdown. + sig := <-c + Close() + ret := 1 + if sig == syscall.SIGTERM { + ret = 0 + } + os.Exit(ret) + }() +} + +// Close closes the agent, removing temporary files and closing the TCP listener. +// If no agent is listening, Close does nothing. +func Close() { + mu.Lock() + defer mu.Unlock() + + if portfile != "" { + os.Remove(portfile) + portfile = "" + } + if listener != nil { + listener.Close() + } +} + +func formatBytes(val uint64) string { + var i int + var target uint64 + for i = range units { + target = 1 << uint(10*(i+1)) + if val < target { + break + } + } + if i > 0 { + return fmt.Sprintf("%0.2f%s (%d bytes)", float64(val)/(float64(target)/1024), units[i], val) + } + return fmt.Sprintf("%d bytes", val) +} + +func handle(conn io.ReadWriter, msg []byte) error { + switch msg[0] { + case signal.StackTrace: + return pprof.Lookup("goroutine").WriteTo(conn, 2) + case signal.GC: + runtime.GC() + _, err := conn.Write([]byte("ok")) + return err + case signal.MemStats: + var s runtime.MemStats + runtime.ReadMemStats(&s) + fmt.Fprintf(conn, "alloc: %v\n", formatBytes(s.Alloc)) + fmt.Fprintf(conn, "total-alloc: %v\n", formatBytes(s.TotalAlloc)) + fmt.Fprintf(conn, "sys: %v\n", formatBytes(s.Sys)) + fmt.Fprintf(conn, "lookups: %v\n", s.Lookups) + fmt.Fprintf(conn, "mallocs: %v\n", s.Mallocs) + fmt.Fprintf(conn, "frees: %v\n", s.Frees) + fmt.Fprintf(conn, "heap-alloc: %v\n", formatBytes(s.HeapAlloc)) + fmt.Fprintf(conn, "heap-sys: %v\n", formatBytes(s.HeapSys)) + fmt.Fprintf(conn, "heap-idle: %v\n", formatBytes(s.HeapIdle)) + fmt.Fprintf(conn, "heap-in-use: %v\n", formatBytes(s.HeapInuse)) + fmt.Fprintf(conn, "heap-released: %v\n", formatBytes(s.HeapReleased)) + fmt.Fprintf(conn, "heap-objects: %v\n", s.HeapObjects) + fmt.Fprintf(conn, "stack-in-use: %v\n", formatBytes(s.StackInuse)) + fmt.Fprintf(conn, "stack-sys: %v\n", formatBytes(s.StackSys)) + fmt.Fprintf(conn, "stack-mspan-inuse: %v\n", formatBytes(s.MSpanInuse)) + fmt.Fprintf(conn, "stack-mspan-sys: %v\n", formatBytes(s.MSpanSys)) + fmt.Fprintf(conn, "stack-mcache-inuse: %v\n", formatBytes(s.MCacheInuse)) + fmt.Fprintf(conn, "stack-mcache-sys: %v\n", formatBytes(s.MCacheSys)) + fmt.Fprintf(conn, "other-sys: %v\n", formatBytes(s.OtherSys)) + fmt.Fprintf(conn, "gc-sys: %v\n", formatBytes(s.GCSys)) + fmt.Fprintf(conn, "next-gc: when heap-alloc >= %v\n", formatBytes(s.NextGC)) + lastGC := "-" + if s.LastGC != 0 { + lastGC = fmt.Sprint(time.Unix(0, int64(s.LastGC))) + } + fmt.Fprintf(conn, "last-gc: %v\n", lastGC) + fmt.Fprintf(conn, "gc-pause-total: %v\n", time.Duration(s.PauseTotalNs)) + fmt.Fprintf(conn, "gc-pause: %v\n", s.PauseNs[(s.NumGC+255)%256]) + fmt.Fprintf(conn, "gc-pause-end: %v\n", s.PauseEnd[(s.NumGC+255)%256]) + fmt.Fprintf(conn, "num-gc: %v\n", s.NumGC) + fmt.Fprintf(conn, "num-forced-gc: %v\n", s.NumForcedGC) + fmt.Fprintf(conn, "gc-cpu-fraction: %v\n", s.GCCPUFraction) + fmt.Fprintf(conn, "enable-gc: %v\n", s.EnableGC) + fmt.Fprintf(conn, "debug-gc: %v\n", s.DebugGC) + case signal.Version: + fmt.Fprintf(conn, "%v\n", runtime.Version()) + case signal.HeapProfile: + return pprof.WriteHeapProfile(conn) + case signal.CPUProfile: + if err := pprof.StartCPUProfile(conn); err != nil { + return err + } + time.Sleep(30 * time.Second) + pprof.StopCPUProfile() + case signal.Stats: + fmt.Fprintf(conn, "goroutines: %v\n", runtime.NumGoroutine()) + fmt.Fprintf(conn, "OS threads: %v\n", pprof.Lookup("threadcreate").Count()) + fmt.Fprintf(conn, "GOMAXPROCS: %v\n", runtime.GOMAXPROCS(0)) + fmt.Fprintf(conn, "num CPU: %v\n", runtime.NumCPU()) + case signal.BinaryDump: + path, err := os.Executable() + if err != nil { + return err + } + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + _, err = bufio.NewReader(f).WriteTo(conn) + return err + case signal.Trace: + if err := trace.Start(conn); err != nil { + return err + } + time.Sleep(5 * time.Second) + trace.Stop() + case signal.SetGCPercent: + perc, err := binary.ReadVarint(bufio.NewReader(conn)) + if err != nil { + return err + } + fmt.Fprintf(conn, "New GC percent set to %v. Previous value was %v.\n", perc, debug.SetGCPercent(int(perc))) + } + return nil +} diff --git a/vendor/github.com/google/gops/agent/sockopt_reuseport.go b/vendor/github.com/google/gops/agent/sockopt_reuseport.go new file mode 100644 index 000000000..8311fe399 --- /dev/null +++ b/vendor/github.com/google/gops/agent/sockopt_reuseport.go @@ -0,0 +1,37 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build !js && !plan9 && !solaris && !windows +// +build !js,!plan9,!solaris,!windows + +package agent + +import ( + "syscall" + + "golang.org/x/sys/unix" +) + +// setReuseAddrAndPortSockopts sets the SO_REUSEADDR and SO_REUSEPORT socket +// options on c's underlying socket in order to increase the chance to re-bind() +// to the same address and port upon agent restart. +func setReuseAddrAndPortSockopts(network, address string, c syscall.RawConn) error { + var soerr error + if err := c.Control(func(su uintptr) { + sock := int(su) + // Allow reuse of recently-used addresses. This socket option is + // set by default on listeners in Go's net package, see + // net.setDefaultSockopts. + soerr = unix.SetsockoptInt(sock, unix.SOL_SOCKET, unix.SO_REUSEADDR, 1) + if soerr != nil { + return + } + // Allow reuse of recently-used ports. This gives the agent a + // better chance to re-bind upon restarts. + soerr = unix.SetsockoptInt(sock, unix.SOL_SOCKET, unix.SO_REUSEPORT, 1) + }); err != nil { + return err + } + return soerr +} diff --git a/vendor/github.com/google/gops/agent/sockopt_unsupported.go b/vendor/github.com/google/gops/agent/sockopt_unsupported.go new file mode 100644 index 000000000..2493e8cad --- /dev/null +++ b/vendor/github.com/google/gops/agent/sockopt_unsupported.go @@ -0,0 +1,14 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build (js && wasm) || plan9 || solaris || windows +// +build js,wasm plan9 solaris windows + +package agent + +import "syscall" + +func setReuseAddrAndPortSockopts(network, address string, c syscall.RawConn) error { + return nil +} diff --git a/vendor/github.com/google/gops/internal/internal.go b/vendor/github.com/google/gops/internal/internal.go new file mode 100644 index 000000000..7e3492aae --- /dev/null +++ b/vendor/github.com/google/gops/internal/internal.go @@ -0,0 +1,61 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package internal + +import ( + "errors" + "os" + "os/user" + "path/filepath" + "strconv" + "strings" +) + +const gopsConfigDirEnvKey = "GOPS_CONFIG_DIR" + +func ConfigDir() (string, error) { + if configDir := os.Getenv(gopsConfigDirEnvKey); configDir != "" { + return configDir, nil + } + + if userConfigDir, err := os.UserConfigDir(); err == nil { + return filepath.Join(userConfigDir, "gops"), nil + } + + homeDir := guessUnixHomeDir() + if homeDir == "" { + return "", errors.New("unable to get current user home directory: os/user lookup failed; $HOME is empty") + } + return filepath.Join(homeDir, ".config", "gops"), nil +} + +func guessUnixHomeDir() string { + usr, err := user.Current() + if err == nil { + return usr.HomeDir + } + return os.Getenv("HOME") +} + +func PIDFile(pid int) (string, error) { + gopsdir, err := ConfigDir() + if err != nil { + return "", err + } + return filepath.Join(gopsdir, strconv.Itoa(pid)), nil +} + +func GetPort(pid int) (string, error) { + portfile, err := PIDFile(pid) + if err != nil { + return "", err + } + b, err := os.ReadFile(portfile) + if err != nil { + return "", err + } + port := strings.TrimSpace(string(b)) + return port, nil +} diff --git a/vendor/github.com/google/gops/signal/signal.go b/vendor/github.com/google/gops/signal/signal.go new file mode 100644 index 000000000..c70764a0f --- /dev/null +++ b/vendor/github.com/google/gops/signal/signal.go @@ -0,0 +1,38 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package signal contains signals used to communicate to the gops agents. +package signal + +const ( + // StackTrace represents a command to print stack trace. + StackTrace = byte(0x1) + + // GC runs the garbage collector. + GC = byte(0x2) + + // MemStats reports memory stats. + MemStats = byte(0x3) + + // Version prints the Go version. + Version = byte(0x4) + + // HeapProfile starts `go tool pprof` with the current memory profile. + HeapProfile = byte(0x5) + + // CPUProfile starts `go tool pprof` with the current CPU profile + CPUProfile = byte(0x6) + + // Stats returns Go runtime statistics such as number of goroutines, GOMAXPROCS, and NumCPU. + Stats = byte(0x7) + + // Trace starts the Go execution tracer, waits 5 seconds and launches the trace tool. + Trace = byte(0x8) + + // BinaryDump returns running binary file. + BinaryDump = byte(0x9) + + // SetGCPercent sets the garbage collection target percentage. + SetGCPercent = byte(0x10) +) diff --git a/vendor/modules.txt b/vendor/modules.txt index e26effabc..2ef9c763d 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -174,6 +174,11 @@ github.com/google/gnostic-models/openapiv3 ## explicit; go 1.12 github.com/google/gofuzz github.com/google/gofuzz/bytesource +# github.com/google/gops v0.3.28 +## explicit; go 1.17 +github.com/google/gops/agent +github.com/google/gops/internal +github.com/google/gops/signal # github.com/google/uuid v1.4.0 ## explicit github.com/google/uuid