Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add goroutine watcher #16

Merged
merged 10 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
test-on-macos:
strategy:
matrix:
os-version: [ 11, 12 ]
os-version: [ 12, 13 ]
go-version: [ 1.16, 1.17, 1.18, 1.19 ]
runs-on: macos-${{ matrix.os-version }}
steps:
Expand Down
106 changes: 95 additions & 11 deletions autopprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/daangn/autopprof/queryer"
"log"
"time"

Expand Down Expand Up @@ -34,13 +35,22 @@ type autoPprof struct {
// Default: 0.75. (mean 75%)
memThreshold float64

// goroutineThreshold is the goroutine count threshold to trigger profile.
// If the goroutine count is over the threshold, the autopprof will
// report the goroutine profile.
// Default: 50000.
goroutineThreshold int

// minConsecutiveOverThreshold is the minimum consecutive
// number of over a threshold for reporting profile again.
// Default: 12.
minConsecutiveOverThreshold int

// queryer is used to query the quota and the cgroup stat.
queryer queryer
// cgroupQueryer is used to query the quota and the cgroup stat.
cgroupQueryer queryer.CgroupsQueryer

// runtimeQueryer is used to query the runtime stat.
runtimeQueryer queryer.RuntimeQueryer

// profiler is used to profile the cpu and the heap memory.
profiler profiler
Expand All @@ -53,8 +63,9 @@ type autoPprof struct {
reportBoth bool
Copy link
Contributor Author

@jake-shin0 jake-shin0 Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This field is an option that is only valid when monitoring only two things, CPU and memory, and as the runtime monitor increases, its meaning becomes less meaningful.

@mingrammer Please give me your opinion on how to remove it in a separate version!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should introduce a new option, reportAll, which reports all profile results when any threshold is exceeded. And deprecate the reportBoth.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I will create this idea as a separate PR.


// Flags to disable the profiling.
disableCPUProf bool
disableMemProf bool
disableCPUProf bool
disableMemProf bool
disableGoroutineProf bool

// stopC is the signal channel to stop the watch processes.
stopC chan struct{}
Expand All @@ -65,7 +76,12 @@ var globalAp *autoPprof

// Start configures and runs the autopprof process.
func Start(opt Option) error {
qryer, err := newQueryer()
cgroupQryer, err := queryer.NewCgroupQueryer()
if err != nil {
return err
}

runtimeQryer, err := queryer.NewRuntimeQueryer()
jake-shin0 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
Expand All @@ -78,8 +94,10 @@ func Start(opt Option) error {
watchInterval: defaultWatchInterval,
cpuThreshold: defaultCPUThreshold,
memThreshold: defaultMemThreshold,
goroutineThreshold: defaultGoroutineThreshold,
minConsecutiveOverThreshold: defaultMinConsecutiveOverThreshold,
queryer: qryer,
cgroupQueryer: cgroupQryer,
runtimeQueryer: runtimeQryer,
profiler: profr,
reporter: opt.Reporter,
reportBoth: opt.ReportBoth,
Expand All @@ -93,6 +111,9 @@ func Start(opt Option) error {
if opt.MemThreshold != 0 {
ap.memThreshold = opt.MemThreshold
}
if opt.GoroutineThreshold != 0 {
ap.goroutineThreshold = opt.GoroutineThreshold
}
if !ap.disableCPUProf {
if err := ap.loadCPUQuota(); err != nil {
return err
Expand All @@ -112,7 +133,7 @@ func Stop() {
}

func (ap *autoPprof) loadCPUQuota() error {
err := ap.queryer.setCPUQuota()
err := ap.cgroupQueryer.SetCPUQuota()
if err == nil {
return nil
}
Expand All @@ -134,6 +155,7 @@ func (ap *autoPprof) loadCPUQuota() error {
func (ap *autoPprof) watch() {
go ap.watchCPUUsage()
go ap.watchMemUsage()
go ap.watchGoroutineCount()
<-ap.stopC
}

Expand All @@ -149,7 +171,7 @@ func (ap *autoPprof) watchCPUUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.cpuUsage()
usage, err := ap.cgroupQueryer.CPUUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -170,7 +192,7 @@ func (ap *autoPprof) watchCPUUsage() {
))
}
if ap.reportBoth && !ap.disableMemProf {
memUsage, err := ap.queryer.memUsage()
memUsage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -226,7 +248,7 @@ func (ap *autoPprof) watchMemUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.memUsage()
usage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -247,7 +269,7 @@ func (ap *autoPprof) watchMemUsage() {
))
}
if ap.reportBoth && !ap.disableCPUProf {
cpuUsage, err := ap.queryer.cpuUsage()
cpuUsage, err := ap.cgroupQueryer.CPUUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -291,6 +313,68 @@ func (ap *autoPprof) reportHeapProfile(memUsage float64) error {
return nil
}

func (ap *autoPprof) watchGoroutineCount() {
if ap.disableGoroutineProf {
return
}

ticker := time.NewTicker(ap.watchInterval)
defer ticker.Stop()

var consecutiveOverThresholdCnt int
for {
select {
case <-ticker.C:
count := ap.runtimeQueryer.GoroutineCount()

if count < ap.goroutineThreshold {
// Reset the count if the goroutine count goes under the threshold.
consecutiveOverThresholdCnt = 0
continue
}

// If goroutine count remains high for a short period of time, no
// duplicate reports are sent.
// This is to prevent the autopprof from sending too many reports.
if consecutiveOverThresholdCnt == 0 {
if err := ap.reportGoroutineProfile(count); err != nil {
log.Println(fmt.Errorf(
"autopprof: failed to report the goroutine profile: %w", err,
))
}
}

consecutiveOverThresholdCnt++
if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold {
// Reset the count and ready to report the goroutine profile again.
consecutiveOverThresholdCnt = 0
}
case <-ap.stopC:
return
}
}
}

func (ap *autoPprof) reportGoroutineProfile(goroutineCount int) error {
b, err := ap.profiler.profileGoroutine()
if err != nil {
return fmt.Errorf("autopprof: failed to profile the goroutine: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), reportTimeout)
defer cancel()

gi := report.GoroutineInfo{
ThresholdCount: ap.goroutineThreshold,
Count: goroutineCount,
}
bReader := bytes.NewReader(b)
if err := ap.reporter.ReportGoroutineProfile(ctx, bReader, gi); err != nil {
return err
}
return nil
}

func (ap *autoPprof) stop() {
close(ap.stopC)
}
Loading
Loading