diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d766b46..a2f706b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,7 +30,7 @@ jobs: test-on-macos: strategy: matrix: - os-version: [ 11, 12 ] + os-version: [ 12, 13 ] go-version: [ 1.16, 1.17, 1.18, 1.19 ] runs-on: macos-${{ matrix.os-version }} steps: diff --git a/autopprof.go b/autopprof.go index 1c45524..17a25fb 100644 --- a/autopprof.go +++ b/autopprof.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "fmt" + "github.com/daangn/autopprof/queryer" "log" "time" @@ -34,13 +35,22 @@ type autoPprof struct { // Default: 0.75. (mean 75%) memThreshold float64 + // goroutineThreshold is the goroutine count threshold to trigger profile. + // If the goroutine count is over the threshold, the autopprof will + // report the goroutine profile. + // Default: 50000. + goroutineThreshold int + // minConsecutiveOverThreshold is the minimum consecutive // number of over a threshold for reporting profile again. // Default: 12. minConsecutiveOverThreshold int - // queryer is used to query the quota and the cgroup stat. - queryer queryer + // cgroupQueryer is used to query the quota and the cgroup stat. + cgroupQueryer queryer.CgroupsQueryer + + // runtimeQueryer is used to query the runtime stat. + runtimeQueryer queryer.RuntimeQueryer // profiler is used to profile the cpu and the heap memory. profiler profiler @@ -53,8 +63,9 @@ type autoPprof struct { reportBoth bool // Flags to disable the profiling. - disableCPUProf bool - disableMemProf bool + disableCPUProf bool + disableMemProf bool + disableGoroutineProf bool // stopC is the signal channel to stop the watch processes. stopC chan struct{} @@ -65,7 +76,12 @@ var globalAp *autoPprof // Start configures and runs the autopprof process. func Start(opt Option) error { - qryer, err := newQueryer() + cgroupQryer, err := queryer.NewCgroupQueryer() + if err != nil { + return err + } + + runtimeQryer, err := queryer.NewRuntimeQueryer() if err != nil { return err } @@ -78,8 +94,10 @@ func Start(opt Option) error { watchInterval: defaultWatchInterval, cpuThreshold: defaultCPUThreshold, memThreshold: defaultMemThreshold, + goroutineThreshold: defaultGoroutineThreshold, minConsecutiveOverThreshold: defaultMinConsecutiveOverThreshold, - queryer: qryer, + cgroupQueryer: cgroupQryer, + runtimeQueryer: runtimeQryer, profiler: profr, reporter: opt.Reporter, reportBoth: opt.ReportBoth, @@ -93,6 +111,9 @@ func Start(opt Option) error { if opt.MemThreshold != 0 { ap.memThreshold = opt.MemThreshold } + if opt.GoroutineThreshold != 0 { + ap.goroutineThreshold = opt.GoroutineThreshold + } if !ap.disableCPUProf { if err := ap.loadCPUQuota(); err != nil { return err @@ -112,7 +133,7 @@ func Stop() { } func (ap *autoPprof) loadCPUQuota() error { - err := ap.queryer.setCPUQuota() + err := ap.cgroupQueryer.SetCPUQuota() if err == nil { return nil } @@ -134,6 +155,7 @@ func (ap *autoPprof) loadCPUQuota() error { func (ap *autoPprof) watch() { go ap.watchCPUUsage() go ap.watchMemUsage() + go ap.watchGoroutineCount() <-ap.stopC } @@ -149,7 +171,7 @@ func (ap *autoPprof) watchCPUUsage() { for { select { case <-ticker.C: - usage, err := ap.queryer.cpuUsage() + usage, err := ap.cgroupQueryer.CPUUsage() if err != nil { log.Println(err) return @@ -170,7 +192,7 @@ func (ap *autoPprof) watchCPUUsage() { )) } if ap.reportBoth && !ap.disableMemProf { - memUsage, err := ap.queryer.memUsage() + memUsage, err := ap.cgroupQueryer.MemUsage() if err != nil { log.Println(err) return @@ -226,7 +248,7 @@ func (ap *autoPprof) watchMemUsage() { for { select { case <-ticker.C: - usage, err := ap.queryer.memUsage() + usage, err := ap.cgroupQueryer.MemUsage() if err != nil { log.Println(err) return @@ -247,7 +269,7 @@ func (ap *autoPprof) watchMemUsage() { )) } if ap.reportBoth && !ap.disableCPUProf { - cpuUsage, err := ap.queryer.cpuUsage() + cpuUsage, err := ap.cgroupQueryer.CPUUsage() if err != nil { log.Println(err) return @@ -291,6 +313,68 @@ func (ap *autoPprof) reportHeapProfile(memUsage float64) error { return nil } +func (ap *autoPprof) watchGoroutineCount() { + if ap.disableGoroutineProf { + return + } + + ticker := time.NewTicker(ap.watchInterval) + defer ticker.Stop() + + var consecutiveOverThresholdCnt int + for { + select { + case <-ticker.C: + count := ap.runtimeQueryer.GoroutineCount() + + if count < ap.goroutineThreshold { + // Reset the count if the goroutine count goes under the threshold. + consecutiveOverThresholdCnt = 0 + continue + } + + // If goroutine count remains high for a short period of time, no + // duplicate reports are sent. + // This is to prevent the autopprof from sending too many reports. + if consecutiveOverThresholdCnt == 0 { + if err := ap.reportGoroutineProfile(count); err != nil { + log.Println(fmt.Errorf( + "autopprof: failed to report the goroutine profile: %w", err, + )) + } + } + + consecutiveOverThresholdCnt++ + if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold { + // Reset the count and ready to report the goroutine profile again. + consecutiveOverThresholdCnt = 0 + } + case <-ap.stopC: + return + } + } +} + +func (ap *autoPprof) reportGoroutineProfile(goroutineCount int) error { + b, err := ap.profiler.profileGoroutine() + if err != nil { + return fmt.Errorf("autopprof: failed to profile the goroutine: %w", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), reportTimeout) + defer cancel() + + gi := report.GoroutineInfo{ + ThresholdCount: ap.goroutineThreshold, + Count: goroutineCount, + } + bReader := bytes.NewReader(b) + if err := ap.reporter.ReportGoroutineProfile(ctx, bReader, gi); err != nil { + return err + } + return nil +} + func (ap *autoPprof) stop() { close(ap.stopC) } diff --git a/autopprof_test.go b/autopprof_test.go index 548cf91..5903204 100644 --- a/autopprof_test.go +++ b/autopprof_test.go @@ -7,9 +7,11 @@ import ( "context" "errors" "io" + "sync" "testing" "time" + "github.com/daangn/autopprof/queryer" "github.com/golang/mock/gomock" "github.com/daangn/autopprof/report" @@ -24,8 +26,9 @@ func TestStart(t *testing.T) { { name: "disable flags are all true", opt: Option{ - DisableCPUProf: true, - DisableMemProf: true, + DisableCPUProf: true, + DisableMemProf: true, + DisableGoroutineProf: true, }, want: ErrDisableAllProfiling, }, @@ -57,6 +60,13 @@ func TestStart(t *testing.T) { }, want: ErrInvalidMemThreshold, }, + { + name: "invalid GoroutineThreshold value -1", + opt: Option{ + GoroutineThreshold: -1, + }, + want: ErrInvalidGoroutineThreshold, + }, { name: "when given reporter is nil", opt: Option{ @@ -158,13 +168,13 @@ func TestAutoPprof_loadCPUQuota(t *testing.T) { newAp: func() *autoPprof { ctrl := gomock.NewController(t) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - setCPUQuota(). + SetCPUQuota(). Return(nil) // Means that the quota is set correctly. return &autoPprof{ - queryer: mockQueryer, + cgroupQueryer: mockQueryer, disableCPUProf: false, disableMemProf: false, } @@ -177,13 +187,13 @@ func TestAutoPprof_loadCPUQuota(t *testing.T) { newAp: func() *autoPprof { ctrl := gomock.NewController(t) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - setCPUQuota(). - Return(ErrV2CPUQuotaUndefined) + SetCPUQuota(). + Return(queryer.ErrV2CPUQuotaUndefined) return &autoPprof{ - queryer: mockQueryer, + cgroupQueryer: mockQueryer, disableCPUProf: false, disableMemProf: false, } @@ -196,19 +206,19 @@ func TestAutoPprof_loadCPUQuota(t *testing.T) { newAp: func() *autoPprof { ctrl := gomock.NewController(t) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - setCPUQuota(). - Return(ErrV2CPUQuotaUndefined) + SetCPUQuota(). + Return(queryer.ErrV2CPUQuotaUndefined) return &autoPprof{ - queryer: mockQueryer, + cgroupQueryer: mockQueryer, disableCPUProf: false, disableMemProf: true, } }, wantDisableCPUProfFlag: false, - wantErr: ErrV2CPUQuotaUndefined, + wantErr: queryer.ErrV2CPUQuotaUndefined, }, } for _, tc := range testCases { @@ -233,9 +243,9 @@ func TestAutoPprof_watchCPUUsage(t *testing.T) { reported bool ) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). DoAndReturn( func() (float64, error) { @@ -269,7 +279,7 @@ func TestAutoPprof_watchCPUUsage(t *testing.T) { disableMemProf: true, watchInterval: 1 * time.Second, cpuThreshold: 0.5, // 50%. - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, stopC: make(chan struct{}), @@ -296,9 +306,9 @@ func TestAutoPprof_watchCPUUsage_consecutive(t *testing.T) { reportedCnt int ) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). DoAndReturn( func() (float64, error) { @@ -333,7 +343,7 @@ func TestAutoPprof_watchCPUUsage_consecutive(t *testing.T) { watchInterval: 1 * time.Second, cpuThreshold: 0.5, // 50%. minConsecutiveOverThreshold: 3, - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, stopC: make(chan struct{}), @@ -390,7 +400,7 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { testCases := []struct { name string fields fields - mockFunc func(*Mockqueryer, *Mockprofiler, *report.MockReporter) + mockFunc func(*queryer.MockCgroupsQueryer, *Mockprofiler, *report.MockReporter) }{ { name: "reportBoth: true", @@ -401,10 +411,10 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { disableMemProf: false, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). Return(0.6, nil), @@ -422,7 +432,7 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { Return(nil), mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). Return(0.2, nil), @@ -450,10 +460,10 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { disableMemProf: true, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). Return(0.6, nil), @@ -481,10 +491,10 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { disableMemProf: false, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). Return(0.6, nil), @@ -508,7 +518,7 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctrl := gomock.NewController(t) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockProfiler := NewMockprofiler(ctrl) mockReporter := report.NewMockReporter(ctrl) @@ -516,7 +526,7 @@ func TestAutoPprof_watchCPUUsage_reportBoth(t *testing.T) { watchInterval: tc.fields.watchInterval, cpuThreshold: tc.fields.cpuThreshold, memThreshold: 0.5, // 50%. - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, reportBoth: tc.fields.reportBoth, @@ -543,9 +553,9 @@ func TestAutoPprof_watchMemUsage(t *testing.T) { reported bool ) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). DoAndReturn( func() (float64, error) { @@ -577,7 +587,7 @@ func TestAutoPprof_watchMemUsage(t *testing.T) { disableCPUProf: true, watchInterval: 1 * time.Second, memThreshold: 0.2, // 20%. - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, stopC: make(chan struct{}), @@ -604,9 +614,9 @@ func TestAutoPprof_watchMemUsage_consecutive(t *testing.T) { reportedCnt int ) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). DoAndReturn( func() (float64, error) { @@ -641,7 +651,7 @@ func TestAutoPprof_watchMemUsage_consecutive(t *testing.T) { watchInterval: 1 * time.Second, memThreshold: 0.2, // 20%. minConsecutiveOverThreshold: 3, - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, stopC: make(chan struct{}), @@ -698,7 +708,7 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { testCases := []struct { name string fields fields - mockFunc func(*Mockqueryer, *Mockprofiler, *report.MockReporter) + mockFunc func(*queryer.MockCgroupsQueryer, *Mockprofiler, *report.MockReporter) }{ { name: "reportBoth: true", @@ -709,10 +719,10 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { disableCPUProf: false, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). Return(0.6, nil), @@ -730,7 +740,7 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { Return(nil), mockQueryer.EXPECT(). - cpuUsage(). + CPUUsage(). AnyTimes(). Return(0.2, nil), @@ -758,10 +768,10 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { disableCPUProf: true, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). Return(0.6, nil), @@ -789,10 +799,10 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { disableCPUProf: false, stopC: make(chan struct{}), }, - mockFunc: func(mockQueryer *Mockqueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { + mockFunc: func(mockQueryer *queryer.MockCgroupsQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { gomock.InOrder( mockQueryer.EXPECT(). - memUsage(). + MemUsage(). AnyTimes(). Return(0.6, nil), @@ -816,7 +826,7 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctrl := gomock.NewController(t) - mockQueryer := NewMockqueryer(ctrl) + mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) mockProfiler := NewMockprofiler(ctrl) mockReporter := report.NewMockReporter(ctrl) @@ -824,7 +834,7 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { watchInterval: tc.fields.watchInterval, cpuThreshold: 0.5, // 50%. memThreshold: tc.fields.memThreshold, - queryer: mockQueryer, + cgroupQueryer: mockQueryer, profiler: mockProfiler, reporter: mockReporter, reportBoth: tc.fields.reportBoth, @@ -843,6 +853,160 @@ func TestAutoPprof_watchMemUsage_reportBoth(t *testing.T) { } } +func TestAutoPprof_watchGoroutineCount(t *testing.T) { + ctrl := gomock.NewController(t) + + var ( + profiled bool + reported bool + ) + + mockQueryer := queryer.NewMockRuntimeQueryer(ctrl) + mockQueryer.EXPECT(). + GoroutineCount(). + AnyTimes(). + DoAndReturn( + func() (int, error) { + return 200, nil + }, + ) + + mockProfiler := NewMockprofiler(ctrl) + mockProfiler.EXPECT(). + profileGoroutine(). + DoAndReturn( + func() ([]byte, error) { + profiled = true + return []byte("prof"), nil + }, + ) + + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT(). + ReportGoroutineProfile(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn( + func(_ context.Context, _ io.Reader, _ report.GoroutineInfo) error { + reported = true + return nil + }, + ) + + ap := &autoPprof{ + disableCPUProf: true, + disableMemProf: true, + watchInterval: 1 * time.Second, + goroutineThreshold: 100, + runtimeQueryer: mockQueryer, + profiler: mockProfiler, + reporter: mockReporter, + stopC: make(chan struct{}), + } + + go ap.watchGoroutineCount() + t.Cleanup(func() { ap.stop() }) + + // Wait for profiling and reporting. + time.Sleep(1050 * time.Millisecond) + if !profiled { + t.Errorf("goroutine count is not profiled") + } + if !reported { + t.Errorf("goroutine count is not reported") + } +} + +func TestAutoPprof_watchGoroutineCount_consecutive(t *testing.T) { + ctrl := gomock.NewController(t) + + var ( + profiledCnt int + reportedCnt int + ) + + mockQueryer := queryer.NewMockRuntimeQueryer(ctrl) + mockQueryer.EXPECT(). + GoroutineCount(). + AnyTimes(). + DoAndReturn( + func() (int, error) { + return 200, nil + }, + ) + + mockProfiler := NewMockprofiler(ctrl) + mockProfiler.EXPECT(). + profileGoroutine(). + AnyTimes(). + DoAndReturn( + func() ([]byte, error) { + profiledCnt++ + return []byte("prof"), nil + }, + ) + + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT(). + ReportGoroutineProfile(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn( + func(_ context.Context, _ io.Reader, _ report.GoroutineInfo) error { + reportedCnt++ + return nil + }, + ) + + ap := &autoPprof{ + disableCPUProf: true, + disableMemProf: true, + watchInterval: 1 * time.Second, + goroutineThreshold: 100, + minConsecutiveOverThreshold: 3, + runtimeQueryer: mockQueryer, + profiler: mockProfiler, + reporter: mockReporter, + stopC: make(chan struct{}), + } + + go ap.watchGoroutineCount() + t.Cleanup(func() { ap.stop() }) + + // Wait for profiling and reporting. + time.Sleep(1050 * time.Millisecond) + if profiledCnt != 1 { + t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) + } + if reportedCnt != 1 { + t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) + } + + time.Sleep(1050 * time.Millisecond) + // 2nd time. It shouldn't be profiled and reported. + if profiledCnt != 1 { + t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) + } + if reportedCnt != 1 { + t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) + } + + time.Sleep(1050 * time.Millisecond) + // 3rd time. It shouldn't be profiled and reported. + if profiledCnt != 1 { + t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) + } + if reportedCnt != 1 { + t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) + } + + time.Sleep(1050 * time.Millisecond) + // 4th time. Now it should be profiled and reported. + if profiledCnt != 2 { + t.Errorf("goroutine count is profiled %d times, want 2", profiledCnt) + } + if reportedCnt != 2 { + t.Errorf("goroutine count is reported %d times, want 2", reportedCnt) + } +} + func fib(n int) int64 { if n <= 1 { return int64(n) @@ -858,13 +1022,13 @@ func BenchmarkLightJob(b *testing.B) { func BenchmarkLightJobWithWatchCPUUsage(b *testing.B) { var ( - qryer, _ = newQueryer() + qryer, _ = queryer.NewCgroupQueryer() ticker = time.NewTicker(defaultWatchInterval) ) for i := 0; i < b.N; i++ { select { case <-ticker.C: - _, _ = qryer.cpuUsage() + _, _ = qryer.CPUUsage() default: fib(10) } @@ -873,13 +1037,13 @@ func BenchmarkLightJobWithWatchCPUUsage(b *testing.B) { func BenchmarkLightJobWithWatchMemUsage(b *testing.B) { var ( - qryer, _ = newQueryer() + qryer, _ = queryer.NewCgroupQueryer() ticker = time.NewTicker(defaultWatchInterval) ) for i := 0; i < b.N; i++ { select { case <-ticker.C: - _, _ = qryer.memUsage() + _, _ = qryer.MemUsage() default: fib(10) } @@ -894,13 +1058,13 @@ func BenchmarkHeavyJob(b *testing.B) { func BenchmarkHeavyJobWithWatchCPUUsage(b *testing.B) { var ( - qryer, _ = newQueryer() + qryer, _ = queryer.NewCgroupQueryer() ticker = time.NewTicker(defaultWatchInterval) ) for i := 0; i < b.N; i++ { select { case <-ticker.C: - _, _ = qryer.cpuUsage() + _, _ = qryer.CPUUsage() default: fib(24) } @@ -909,15 +1073,81 @@ func BenchmarkHeavyJobWithWatchCPUUsage(b *testing.B) { func BenchmarkHeavyJobWithWatchMemUsage(b *testing.B) { var ( - qryer, _ = newQueryer() + qryer, _ = queryer.NewCgroupQueryer() ticker = time.NewTicker(defaultWatchInterval) ) for i := 0; i < b.N; i++ { select { case <-ticker.C: - _, _ = qryer.memUsage() + _, _ = qryer.MemUsage() default: fib(24) } } } + +func fibAsync(n int) int64 { + if n <= 1 { + return int64(n) + } + + var ( + v int64 + m sync.Mutex + wg sync.WaitGroup + ) + + wg.Add(1) + go func() { + defer wg.Done() + + m.Lock() + defer m.Unlock() + v = fibAsync(n-1) + fibAsync(n-2) + }() + wg.Wait() + + return v +} + +func BenchmarkLightAsyncJob(b *testing.B) { + for i := 0; i < b.N; i++ { + fibAsync(10) + } +} + +func BenchmarkLightAsyncJobWithWatchGoroutineCount(b *testing.B) { + var ( + qryer, _ = queryer.NewRuntimeQueryer() + ticker = time.NewTicker(defaultWatchInterval) + ) + for i := 0; i < b.N; i++ { + select { + case <-ticker.C: + _ = qryer.GoroutineCount() + default: + fibAsync(10) + } + } +} + +func BenchmarkHeavyAsyncJob(b *testing.B) { + for i := 0; i < b.N; i++ { + fibAsync(24) + } +} + +func BenchmarkHeavyAsyncJobWithWatchGoroutineCount(b *testing.B) { + var ( + qryer, _ = queryer.NewRuntimeQueryer() + ticker = time.NewTicker(defaultWatchInterval) + ) + for i := 0; i < b.N; i++ { + select { + case <-ticker.C: + _ = qryer.GoroutineCount() + default: + fibAsync(24) + } + } +} diff --git a/cgroups.go b/cgroups.go deleted file mode 100644 index 25d1043..0000000 --- a/cgroups.go +++ /dev/null @@ -1,31 +0,0 @@ -//go:build linux -// +build linux - -package autopprof - -import ( - "github.com/containerd/cgroups" -) - -//go:generate mockgen -source=cgroups.go -destination=cgroups_mock.go -package=autopprof - -const ( - cpuUsageSnapshotQueueSize = 24 // 24 * 5s = 2 minutes. -) - -type queryer interface { - cpuUsage() (float64, error) - memUsage() (float64, error) - - setCPUQuota() error -} - -func newQueryer() (queryer, error) { - switch cgroups.Mode() { - case cgroups.Legacy: - return newCgroupsV1(), nil - case cgroups.Hybrid, cgroups.Unified: - return newCgroupsV2(), nil - } - return nil, ErrCgroupsUnavailable -} diff --git a/cgroups_mock.go b/cgroups_mock.go deleted file mode 100644 index 2f51c06..0000000 --- a/cgroups_mock.go +++ /dev/null @@ -1,78 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: cgroups.go - -// Package autopprof is a generated GoMock package. -package autopprof - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// Mockqueryer is a mock of queryer interface. -type Mockqueryer struct { - ctrl *gomock.Controller - recorder *MockqueryerMockRecorder -} - -// MockqueryerMockRecorder is the mock recorder for Mockqueryer. -type MockqueryerMockRecorder struct { - mock *Mockqueryer -} - -// NewMockqueryer creates a new mock instance. -func NewMockqueryer(ctrl *gomock.Controller) *Mockqueryer { - mock := &Mockqueryer{ctrl: ctrl} - mock.recorder = &MockqueryerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *Mockqueryer) EXPECT() *MockqueryerMockRecorder { - return m.recorder -} - -// cpuUsage mocks base method. -func (m *Mockqueryer) cpuUsage() (float64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "cpuUsage") - ret0, _ := ret[0].(float64) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// cpuUsage indicates an expected call of cpuUsage. -func (mr *MockqueryerMockRecorder) cpuUsage() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "cpuUsage", reflect.TypeOf((*Mockqueryer)(nil).cpuUsage)) -} - -// memUsage mocks base method. -func (m *Mockqueryer) memUsage() (float64, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "memUsage") - ret0, _ := ret[0].(float64) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// memUsage indicates an expected call of memUsage. -func (mr *MockqueryerMockRecorder) memUsage() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "memUsage", reflect.TypeOf((*Mockqueryer)(nil).memUsage)) -} - -// setCPUQuota mocks base method. -func (m *Mockqueryer) setCPUQuota() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "setCPUQuota") - ret0, _ := ret[0].(error) - return ret0 -} - -// setCPUQuota indicates an expected call of setCPUQuota. -func (mr *MockqueryerMockRecorder) setCPUQuota() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "setCPUQuota", reflect.TypeOf((*Mockqueryer)(nil).setCPUQuota)) -} diff --git a/error.go b/error.go index edba05f..718255e 100644 --- a/error.go +++ b/error.go @@ -7,16 +7,15 @@ var ( ErrUnsupportedPlatform = fmt.Errorf( "autopprof: unsupported platform (only Linux is supported)", ) - ErrCgroupsUnavailable = fmt.Errorf("autopprof: cgroups is unavailable") ErrInvalidCPUThreshold = fmt.Errorf( "autopprof: cpu threshold value must be between 0 and 1", ) ErrInvalidMemThreshold = fmt.Errorf( "autopprof: memory threshold value must be between 0 and 1", ) + ErrInvalidGoroutineThreshold = fmt.Errorf( + "autopprof: goroutine threshold value must be greater than to 0", + ) ErrNilReporter = fmt.Errorf("autopprof: Reporter can't be nil") ErrDisableAllProfiling = fmt.Errorf("autopprof: all profiling is disabled") - ErrV2CPUQuotaUndefined = fmt.Errorf("autopprof: v2 cpu quota is undefined") - ErrV2CPUMaxEmpty = fmt.Errorf("autopprof: v2 cpu.max is empty") - ErrV1CPUSubsystemEmpty = fmt.Errorf("autopprof: v1 cpu subsystem is empty") ) diff --git a/examples/go.mod b/examples/go.mod index d5a531e..647ae4f 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -17,5 +17,5 @@ require ( github.com/opencontainers/runtime-spec v1.0.2 // indirect github.com/sirupsen/logrus v1.8.1 // indirect github.com/slack-go/slack v0.11.3 // indirect - golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect + golang.org/x/sys v0.1.0 // indirect ) diff --git a/examples/go.sum b/examples/go.sum index feeb5c6..ff95b39 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -60,6 +60,7 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/option.go b/option.go index 1dd45d6..6b836a0 100644 --- a/option.go +++ b/option.go @@ -9,6 +9,7 @@ import ( const ( defaultCPUThreshold = 0.75 defaultMemThreshold = 0.75 + defaultGoroutineThreshold = 50000 defaultWatchInterval = 5 * time.Second defaultCPUProfilingDuration = 10 * time.Second defaultMinConsecutiveOverThreshold = 12 // min 1 minute. (12*5s) @@ -20,6 +21,8 @@ type Option struct { DisableCPUProf bool // DisableMemProf disables the memory profiling. DisableMemProf bool + // DisableGoroutineProf disables the goroutine profiling. + DisableGoroutineProf bool // CPUThreshold is the cpu usage threshold (between 0 and 1) // to trigger the cpu profiling. @@ -33,6 +36,12 @@ type Option struct { // is higher than this threshold. MemThreshold float64 + // GoroutineThreshold is the goroutine count threshold to trigger the goroutine profiling. + // to trigger the goroutine profiling. + // Autopprof will start the goroutine profiling when the goroutine count + // is higher than this threshold. + GoroutineThreshold int + // ReportBoth sets whether to trigger reports for both CPU and memory when either threshold is exceeded. // If some profiling is disabled, exclude it. ReportBoth bool @@ -44,7 +53,7 @@ type Option struct { // NOTE(mingrammer): testing the validate() is done in autopprof_test.go. func (o Option) validate() error { - if o.DisableCPUProf && o.DisableMemProf { + if o.DisableCPUProf && o.DisableMemProf && o.DisableGoroutineProf { return ErrDisableAllProfiling } if o.CPUThreshold < 0 || o.CPUThreshold > 1 { @@ -53,6 +62,9 @@ func (o Option) validate() error { if o.MemThreshold < 0 || o.MemThreshold > 1 { return ErrInvalidMemThreshold } + if o.GoroutineThreshold < 0 { + return ErrInvalidGoroutineThreshold + } if o.Reporter == nil { return ErrNilReporter } diff --git a/profile.go b/profile.go index 8f88d28..6a3b25e 100644 --- a/profile.go +++ b/profile.go @@ -14,6 +14,8 @@ type profiler interface { profileCPU() ([]byte, error) // profileHeap profiles the heap usage. profileHeap() ([]byte, error) + // profileGoroutine profiles the goroutine usage. + profileGoroutine() ([]byte, error) } type defaultProfiler struct { @@ -51,7 +53,21 @@ func (p *defaultProfiler) profileHeap() ([]byte, error) { buf bytes.Buffer w = bufio.NewWriter(&buf) ) - if err := pprof.WriteHeapProfile(w); err != nil { + if err := pprof.Lookup("heap").WriteTo(w, 0); err != nil { + return nil, err + } + if err := w.Flush(); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (p *defaultProfiler) profileGoroutine() ([]byte, error) { + var ( + buf bytes.Buffer + w = bufio.NewWriter(&buf) + ) + if err := pprof.Lookup("goroutine").WriteTo(w, 0); err != nil { return nil, err } if err := w.Flush(); err != nil { diff --git a/profile_mock.go b/profile_mock.go index 290bfe2..a5957fb 100644 --- a/profile_mock.go +++ b/profile_mock.go @@ -48,6 +48,21 @@ func (mr *MockprofilerMockRecorder) profileCPU() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "profileCPU", reflect.TypeOf((*Mockprofiler)(nil).profileCPU)) } +// profileGoroutine mocks base method. +func (m *Mockprofiler) profileGoroutine() ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "profileGoroutine") + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// profileGoroutine indicates an expected call of profileGoroutine. +func (mr *MockprofilerMockRecorder) profileGoroutine() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "profileGoroutine", reflect.TypeOf((*Mockprofiler)(nil).profileGoroutine)) +} + // profileHeap mocks base method. func (m *Mockprofiler) profileHeap() ([]byte, error) { m.ctrl.T.Helper() diff --git a/cgroupv1.go b/queryer/cgroupv1.go similarity index 93% rename from cgroupv1.go rename to queryer/cgroupv1.go index 74bcab7..97cb007 100644 --- a/cgroupv1.go +++ b/queryer/cgroupv1.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package autopprof +package queryer import ( "bufio" @@ -45,7 +45,38 @@ func newCgroupsV1() *cgroupV1 { } } -func (c *cgroupV1) setCPUQuota() error { +func (c *cgroupV1) CPUUsage() (float64, error) { + stat, err := c.stat() + if err != nil { + return 0, err + } + c.snapshotCPUUsage(stat.CPU.Usage.Total) // In nanoseconds. + + // Calculate the usage only if there are enough snapshots. + if !c.q.isFull() { + return 0, nil + } + + s1, s2 := c.q.head(), c.q.tail() + delta := time.Duration(s2.usage-s1.usage) * cgroupV1UsageUnit + duration := s2.timestamp.Sub(s1.timestamp) + return (float64(delta) / float64(duration)) / c.cpuQuota, nil +} + +func (c *cgroupV1) MemUsage() (float64, error) { + stat, err := c.stat() + if err != nil { + return 0, err + } + var ( + sm = stat.Memory + usage = sm.Usage.Usage - sm.InactiveFile + limit = sm.HierarchicalMemoryLimit + ) + return float64(usage) / float64(limit), nil +} + +func (c *cgroupV1) SetCPUQuota() error { quota, err := c.parseCPU(cgroupV1CPUQuotaFile) if err != nil { return err @@ -80,37 +111,6 @@ func (c *cgroupV1) stat() (*v1.Metrics, error) { return stat, nil } -func (c *cgroupV1) cpuUsage() (float64, error) { - stat, err := c.stat() - if err != nil { - return 0, err - } - c.snapshotCPUUsage(stat.CPU.Usage.Total) // In nanoseconds. - - // Calculate the usage only if there are enough snapshots. - if !c.q.isFull() { - return 0, nil - } - - s1, s2 := c.q.head(), c.q.tail() - delta := time.Duration(s2.usage-s1.usage) * cgroupV1UsageUnit - duration := s2.timestamp.Sub(s1.timestamp) - return (float64(delta) / float64(duration)) / c.cpuQuota, nil -} - -func (c *cgroupV1) memUsage() (float64, error) { - stat, err := c.stat() - if err != nil { - return 0, err - } - var ( - sm = stat.Memory - usage = sm.Usage.Usage - sm.InactiveFile - limit = sm.HierarchicalMemoryLimit - ) - return float64(usage) / float64(limit), nil -} - func (c *cgroupV1) parseCPU(filename string) (int, error) { f, err := os.Open( path.Join(c.mountPoint, c.cpuSubsystem, filename), diff --git a/cgroupv1_test.go b/queryer/cgroupv1_test.go similarity index 56% rename from cgroupv1_test.go rename to queryer/cgroupv1_test.go index aaf1d45..eab3671 100644 --- a/cgroupv1_test.go +++ b/queryer/cgroupv1_test.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package autopprof +package queryer import ( "testing" @@ -10,7 +10,7 @@ import ( "github.com/containerd/cgroups" ) -func TestCgroupV1_cpuUsage(t *testing.T) { +func TestCgroupV1_CPUUsage(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Legacy { t.Skip("cgroup v1 is not available") @@ -19,57 +19,57 @@ func TestCgroupV1_cpuUsage(t *testing.T) { cgv1.cpuQuota = 2 cgv1.q = newCPUUsageSnapshotQueue(3) - usage, err := cgv1.cpuUsage() + usage, err := cgv1.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage != 0 { // The cpu usage is 0 until the queue is full. - t.Errorf("cpuUsage() = %f, want 0", usage) + t.Errorf("CPUUsage() = %f, want 0", usage) } time.Sleep(1050 * time.Millisecond) - usage, err = cgv1.cpuUsage() + usage, err = cgv1.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage != 0 { // The cpu usage is 0 until the queue is full. - t.Errorf("cpuUsage() = %f, want 0", usage) + t.Errorf("CPUUsage() = %f, want 0", usage) } time.Sleep(1050 * time.Millisecond) - usage, err = cgv1.cpuUsage() + usage, err = cgv1.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage < 0 || usage > 1 { - t.Errorf("cpuUsage() = %f, want between 0 and 1", usage) + t.Errorf("CPUUsage() = %f, want between 0 and 1", usage) } } -func TestCgroupV1_memUsage(t *testing.T) { +func TestCgroupV1_MemUsage(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Legacy { t.Skip("cgroup v1 is not available") } - usage, err := newCgroupsV1().memUsage() + usage, err := newCgroupsV1().MemUsage() if err != nil { - t.Errorf("memUsage() = %v, want nil", err) + t.Errorf("MemUsage() = %v, want nil", err) } if usage < 0 || usage > 1 { - t.Errorf("memUsage() = %f, want between 0 and 1", usage) + t.Errorf("MemUsage() = %f, want between 0 and 1", usage) } } -func TestCgroupV1_setCPUQuota(t *testing.T) { +func TestCgroupV1_SetCPUQuota(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Legacy { t.Skip("cgroup v1 is not available") } cgv1 := newCgroupsV1() - if err := cgv1.setCPUQuota(); err != nil { - t.Errorf("setCPUQuota() = %v, want nil", err) + if err := cgv1.SetCPUQuota(); err != nil { + t.Errorf("SetCPUQuota() = %v, want nil", err) } // The cpu quota of test docker container is 1.5. if cgv1.cpuQuota != 1.5 { diff --git a/cgroupv2.go b/queryer/cgroupv2.go similarity index 94% rename from cgroupv2.go rename to queryer/cgroupv2.go index 9f7da9a..a44b95f 100644 --- a/cgroupv2.go +++ b/queryer/cgroupv2.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package autopprof +package queryer import ( "bufio" @@ -49,7 +49,38 @@ func newCgroupsV2() *cgroupV2 { } } -func (c *cgroupV2) setCPUQuota() error { +func (c *cgroupV2) CPUUsage() (float64, error) { + stat, err := c.stat() + if err != nil { + return 0, err + } + c.snapshotCPUUsage(stat.CPU.UsageUsec) // In microseconds. + + // Calculate the usage only if there are enough snapshots. + if !c.q.isFull() { + return 0, nil + } + + s1, s2 := c.q.head(), c.q.tail() + delta := time.Duration(s2.usage-s1.usage) * cgroupV2UsageUnit + duration := s2.timestamp.Sub(s1.timestamp) + return (float64(delta) / float64(duration)) / c.cpuQuota, nil +} + +func (c *cgroupV2) MemUsage() (float64, error) { + stat, err := c.stat() + if err != nil { + return 0, err + } + var ( + sm = stat.Memory + usage = sm.Usage - sm.InactiveFile + limit = sm.UsageLimit + ) + return float64(usage) / float64(limit), nil +} + +func (c *cgroupV2) SetCPUQuota() error { f, err := os.Open( path.Join(c.mountPoint, c.cpuMaxFile), ) @@ -115,34 +146,3 @@ func (c *cgroupV2) stat() (*stats.Metrics, error) { } return stat, nil } - -func (c *cgroupV2) cpuUsage() (float64, error) { - stat, err := c.stat() - if err != nil { - return 0, err - } - c.snapshotCPUUsage(stat.CPU.UsageUsec) // In microseconds. - - // Calculate the usage only if there are enough snapshots. - if !c.q.isFull() { - return 0, nil - } - - s1, s2 := c.q.head(), c.q.tail() - delta := time.Duration(s2.usage-s1.usage) * cgroupV2UsageUnit - duration := s2.timestamp.Sub(s1.timestamp) - return (float64(delta) / float64(duration)) / c.cpuQuota, nil -} - -func (c *cgroupV2) memUsage() (float64, error) { - stat, err := c.stat() - if err != nil { - return 0, err - } - var ( - sm = stat.Memory - usage = sm.Usage - sm.InactiveFile - limit = sm.UsageLimit - ) - return float64(usage) / float64(limit), nil -} diff --git a/cgroupv2_test.go b/queryer/cgroupv2_test.go similarity index 59% rename from cgroupv2_test.go rename to queryer/cgroupv2_test.go index cedcd78..dd7fdc2 100644 --- a/cgroupv2_test.go +++ b/queryer/cgroupv2_test.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package autopprof +package queryer import ( "testing" @@ -10,7 +10,7 @@ import ( "github.com/containerd/cgroups" ) -func TestCgroupV2_cpuUsage(t *testing.T) { +func TestCgroupV2_CPUUsage(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Hybrid && mode != cgroups.Unified { t.Skip("cgroup v2 is not available") @@ -19,58 +19,58 @@ func TestCgroupV2_cpuUsage(t *testing.T) { cgv2.cpuQuota = 2 cgv2.q = newCPUUsageSnapshotQueue(3) - usage, err := cgv2.cpuUsage() + usage, err := cgv2.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage != 0 { // The cpu usage is 0 until the queue is full. - t.Errorf("cpuUsage() = %f, want 0", usage) + t.Errorf("CPUUsage() = %f, want 0", usage) } time.Sleep(1050 * time.Millisecond) - usage, err = cgv2.cpuUsage() + usage, err = cgv2.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage != 0 { // The cpu usage is 0 until the queue is full. - t.Errorf("cpuUsage() = %f, want 0", usage) + t.Errorf("CPUUsage() = %f, want 0", usage) } time.Sleep(1050 * time.Millisecond) - usage, err = cgv2.cpuUsage() + usage, err = cgv2.CPUUsage() if err != nil { - t.Errorf("cpuUsage() = %v, want nil", err) + t.Errorf("CPUUsage() = %v, want nil", err) } if usage < 0 || usage > 1 { - t.Errorf("cpuUsage() = %f, want between 0 and 1", usage) + t.Errorf("CPUUsage() = %f, want between 0 and 1", usage) } } -func TestCgroupV2_memUsage(t *testing.T) { +func TestCgroupV2_MemUsage(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Hybrid && mode != cgroups.Unified { t.Skip("cgroup v2 is not available") } cgv2 := newCgroupsV2() - usage, err := cgv2.memUsage() + usage, err := cgv2.MemUsage() if err != nil { - t.Errorf("memUsage() = %v, want nil", err) + t.Errorf("MemUsage() = %v, want nil", err) } if usage < 0 || usage > 1 { - t.Errorf("memUsage() = %f, want between 0 and 1", usage) + t.Errorf("MemUsage() = %f, want between 0 and 1", usage) } } -func TestCgroupV2_setCPUQuota(t *testing.T) { +func TestCgroupV2_SetCPUQuota(t *testing.T) { mode := cgroups.Mode() if mode != cgroups.Hybrid && mode != cgroups.Unified { t.Skip("cgroup v2 is not available") } cgv2 := newCgroupsV2() - if err := cgv2.setCPUQuota(); err != nil { - t.Errorf("setCPUQuota() = %v, want nil", err) + if err := cgv2.SetCPUQuota(); err != nil { + t.Errorf("SetCPUQuota() = %v, want nil", err) } // The cpu quota of test docker container is 1.5. if cgv2.cpuQuota != 1.5 { diff --git a/queryer/error.go b/queryer/error.go new file mode 100644 index 0000000..ec517ac --- /dev/null +++ b/queryer/error.go @@ -0,0 +1,11 @@ +package queryer + +import "fmt" + +// Errors. +var ( + ErrCgroupsUnavailable = fmt.Errorf("autopprof: cgroups is unavailable") + ErrV2CPUQuotaUndefined = fmt.Errorf("autopprof: v2 cpu quota is undefined") + ErrV2CPUMaxEmpty = fmt.Errorf("autopprof: v2 cpu.max is empty") + ErrV1CPUSubsystemEmpty = fmt.Errorf("autopprof: v1 cpu subsystem is empty") +) diff --git a/queryer/queryer.go b/queryer/queryer.go new file mode 100644 index 0000000..6be4d4e --- /dev/null +++ b/queryer/queryer.go @@ -0,0 +1,39 @@ +//go:build linux +// +build linux + +package queryer + +import ( + "github.com/containerd/cgroups" +) + +//go:generate mockgen -source=queryer.go -destination=queryer_mock.go -package=queryer + +const ( + cpuUsageSnapshotQueueSize = 24 // 24 * 5s = 2 minutes. +) + +type CgroupsQueryer interface { + CPUUsage() (float64, error) + MemUsage() (float64, error) + + SetCPUQuota() error +} + +type RuntimeQueryer interface { + GoroutineCount() int +} + +func NewCgroupQueryer() (CgroupsQueryer, error) { + switch cgroups.Mode() { + case cgroups.Legacy: + return newCgroupsV1(), nil + case cgroups.Hybrid, cgroups.Unified: + return newCgroupsV2(), nil + } + return nil, ErrCgroupsUnavailable +} + +func NewRuntimeQueryer() (RuntimeQueryer, error) { + return newRuntimeQuery(), nil +} diff --git a/queryer/queryer_mock.go b/queryer/queryer_mock.go new file mode 100644 index 0000000..4d14f72 --- /dev/null +++ b/queryer/queryer_mock.go @@ -0,0 +1,115 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: queryer.go + +// Package queryer is a generated GoMock package. +package queryer + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockCgroupsQueryer is a mock of CgroupsQueryer interface. +type MockCgroupsQueryer struct { + ctrl *gomock.Controller + recorder *MockCgroupsQueryerMockRecorder +} + +// MockCgroupsQueryerMockRecorder is the mock recorder for MockCgroupsQueryer. +type MockCgroupsQueryerMockRecorder struct { + mock *MockCgroupsQueryer +} + +// NewMockCgroupsQueryer creates a new mock instance. +func NewMockCgroupsQueryer(ctrl *gomock.Controller) *MockCgroupsQueryer { + mock := &MockCgroupsQueryer{ctrl: ctrl} + mock.recorder = &MockCgroupsQueryerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockCgroupsQueryer) EXPECT() *MockCgroupsQueryerMockRecorder { + return m.recorder +} + +// CPUUsage mocks base method. +func (m *MockCgroupsQueryer) CPUUsage() (float64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CPUUsage") + ret0, _ := ret[0].(float64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CPUUsage indicates an expected call of CPUUsage. +func (mr *MockCgroupsQueryerMockRecorder) CPUUsage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CPUUsage", reflect.TypeOf((*MockCgroupsQueryer)(nil).CPUUsage)) +} + +// MemUsage mocks base method. +func (m *MockCgroupsQueryer) MemUsage() (float64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "MemUsage") + ret0, _ := ret[0].(float64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// MemUsage indicates an expected call of MemUsage. +func (mr *MockCgroupsQueryerMockRecorder) MemUsage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MemUsage", reflect.TypeOf((*MockCgroupsQueryer)(nil).MemUsage)) +} + +// SetCPUQuota mocks base method. +func (m *MockCgroupsQueryer) SetCPUQuota() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetCPUQuota") + ret0, _ := ret[0].(error) + return ret0 +} + +// SetCPUQuota indicates an expected call of SetCPUQuota. +func (mr *MockCgroupsQueryerMockRecorder) SetCPUQuota() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetCPUQuota", reflect.TypeOf((*MockCgroupsQueryer)(nil).SetCPUQuota)) +} + +// MockRuntimeQueryer is a mock of RuntimeQueryer interface. +type MockRuntimeQueryer struct { + ctrl *gomock.Controller + recorder *MockRuntimeQueryerMockRecorder +} + +// MockRuntimeQueryerMockRecorder is the mock recorder for MockRuntimeQueryer. +type MockRuntimeQueryerMockRecorder struct { + mock *MockRuntimeQueryer +} + +// NewMockRuntimeQueryer creates a new mock instance. +func NewMockRuntimeQueryer(ctrl *gomock.Controller) *MockRuntimeQueryer { + mock := &MockRuntimeQueryer{ctrl: ctrl} + mock.recorder = &MockRuntimeQueryerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockRuntimeQueryer) EXPECT() *MockRuntimeQueryerMockRecorder { + return m.recorder +} + +// GoroutineCount mocks base method. +func (m *MockRuntimeQueryer) GoroutineCount() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GoroutineCount") + ret0, _ := ret[0].(int) + return ret0 +} + +// GoroutineCount indicates an expected call of GoroutineCount. +func (mr *MockRuntimeQueryerMockRecorder) GoroutineCount() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GoroutineCount", reflect.TypeOf((*MockRuntimeQueryer)(nil).GoroutineCount)) +} diff --git a/cgroups_test.go b/queryer/queryer_test.go similarity index 76% rename from cgroups_test.go rename to queryer/queryer_test.go index 2eb29ef..1386009 100644 --- a/cgroups_test.go +++ b/queryer/queryer_test.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -package autopprof +package queryer import ( "testing" @@ -9,9 +9,9 @@ import ( "github.com/containerd/cgroups" ) -func TestNewQueryer(t *testing.T) { +func TestNewCgroupQueryer(t *testing.T) { mode := cgroups.Mode() - _, err := newQueryer() + _, err := NewCgroupQueryer() if mode == cgroups.Unavailable && err == nil { t.Errorf("newQueryer() = nil, want error") } else if err != nil { diff --git a/queue.go b/queryer/queue.go similarity index 99% rename from queue.go rename to queryer/queue.go index 23f886e..c62da7d 100644 --- a/queue.go +++ b/queryer/queue.go @@ -1,4 +1,4 @@ -package autopprof +package queryer import "time" diff --git a/queue_test.go b/queryer/queue_test.go similarity index 99% rename from queue_test.go rename to queryer/queue_test.go index c03cbeb..df34312 100644 --- a/queue_test.go +++ b/queryer/queue_test.go @@ -1,4 +1,4 @@ -package autopprof +package queryer import ( "testing" diff --git a/queryer/runtime.go b/queryer/runtime.go new file mode 100644 index 0000000..2f0669b --- /dev/null +++ b/queryer/runtime.go @@ -0,0 +1,14 @@ +package queryer + +import "runtime/pprof" + +type runtimeQuery struct { +} + +func newRuntimeQuery() *runtimeQuery { + return &runtimeQuery{} +} + +func (r runtimeQuery) GoroutineCount() int { + return pprof.Lookup("goroutine").Count() +} diff --git a/queryer/runtime_test.go b/queryer/runtime_test.go new file mode 100644 index 0000000..39fcfa6 --- /dev/null +++ b/queryer/runtime_test.go @@ -0,0 +1,39 @@ +package queryer + +import ( + "sync" + "testing" + "time" +) + +func Test_runtimeQuery_GoroutineCount(t *testing.T) { + r := newRuntimeQuery() + + initGoroutineCnt := r.GoroutineCount() + if initGoroutineCnt < 1 { + t.Errorf("GoroutineCount() = %d; want is > 0", initGoroutineCnt) + } + + wg := sync.WaitGroup{} + + goroutineCnt := 1000 + for i := 0; i < goroutineCnt; i++ { + wg.Add(1) + go func() { + time.Sleep(500 * time.Millisecond) + wg.Done() + }() + } + + processingGoroutineCnt := r.GoroutineCount() + if processingGoroutineCnt != initGoroutineCnt+goroutineCnt { + t.Errorf("GoroutineCount() = %d; want is %d", processingGoroutineCnt, initGoroutineCnt+1) + } + + wg.Wait() + + remainedGoroutineCnt := r.GoroutineCount() + if remainedGoroutineCnt != initGoroutineCnt { + t.Errorf("GoroutineCount() = %d; want is %d", remainedGoroutineCnt, initGoroutineCnt) + } +} diff --git a/report/report.go b/report/report.go index 9e43cfc..ee7513e 100644 --- a/report/report.go +++ b/report/report.go @@ -15,6 +15,10 @@ const ( // HeapProfileFilenameFmt is the filename format for the heap profile. // pprof...alloc_objects.alloc_space.inuse_objects.inuse_space..pprof. HeapProfileFilenameFmt = "pprof.%s.%s.alloc_objects.alloc_space.inuse_objects.inuse_space.%s.pprof" + + // GoroutineProfileFilenameFmt is the filename format for the goroutine profile. + // pprof...goroutine..pprof. + GoroutineProfileFilenameFmt = "pprof.%s.%s.goroutine.%s.pprof" ) // Reporter is responsible for reporting the profiling report to the destination. @@ -24,6 +28,9 @@ type Reporter interface { // ReportHeapProfile sends the heap profiling data to the specific destination. ReportHeapProfile(ctx context.Context, r io.Reader, mi MemInfo) error + + // ReportGoroutineProfile sends the goroutine profiling data to the specific destination. + ReportGoroutineProfile(ctx context.Context, r io.Reader, gi GoroutineInfo) error } // CPUInfo is the CPU usage information. @@ -37,3 +44,8 @@ type MemInfo struct { ThresholdPercentage float64 UsagePercentage float64 } + +type GoroutineInfo struct { + ThresholdCount int + Count int +} diff --git a/report/report_mock.go b/report/report_mock.go index 873ba39..56a4c62 100644 --- a/report/report_mock.go +++ b/report/report_mock.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: report.go +// Source: report/report.go // Package report is a generated GoMock package. package report @@ -49,6 +49,20 @@ func (mr *MockReporterMockRecorder) ReportCPUProfile(ctx, r, ci interface{}) *go return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportCPUProfile", reflect.TypeOf((*MockReporter)(nil).ReportCPUProfile), ctx, r, ci) } +// ReportGoroutineProfile mocks base method. +func (m *MockReporter) ReportGoroutineProfile(ctx context.Context, r io.Reader, gi GoroutineInfo) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReportGoroutineProfile", ctx, r, gi) + ret0, _ := ret[0].(error) + return ret0 +} + +// ReportGoroutineProfile indicates an expected call of ReportGoroutineProfile. +func (mr *MockReporterMockRecorder) ReportGoroutineProfile(ctx, r, gi interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportGoroutineProfile", reflect.TypeOf((*MockReporter)(nil).ReportGoroutineProfile), ctx, r, gi) +} + // ReportHeapProfile mocks base method. func (m *MockReporter) ReportHeapProfile(ctx context.Context, r io.Reader, mi MemInfo) error { m.ctrl.T.Helper() diff --git a/report/slack.go b/report/slack.go index 0769b20..27ab319 100644 --- a/report/slack.go +++ b/report/slack.go @@ -13,8 +13,9 @@ import ( const ( reportTimeLayout = "2006-01-02T150405.MST" - cpuCommentFmt = ":rotating_light:[CPU] usage (*%.2f%%*) > threshold (*%.2f%%*)" - memCommentFmt = ":rotating_light:[MEM] usage (*%.2f%%*) > threshold (*%.2f%%*)" + cpuCommentFmt = ":rotating_light:[CPU] usage (*%.2f%%*) > threshold (*%.2f%%*)" + memCommentFmt = ":rotating_light:[MEM] usage (*%.2f%%*) > threshold (*%.2f%%*)" + goroutineCommentFmt = ":rotating_light:[GOROUTINE] count (*%d*) > threshold (*%d*)" ) // SlackReporter is the reporter to send the profiling report to the @@ -85,3 +86,25 @@ func (s *SlackReporter) ReportHeapProfile( } return nil } + +// ReportGoroutineProfile sends the goroutine profiling data to the Slack. +func (s *SlackReporter) ReportGoroutineProfile( + ctx context.Context, r io.Reader, gi GoroutineInfo, +) error { + hostname, _ := os.Hostname() // Don't care about this error. + var ( + now = time.Now().Format(reportTimeLayout) + filename = fmt.Sprintf(GoroutineProfileFilenameFmt, s.app, hostname, now) + comment = fmt.Sprintf(goroutineCommentFmt, gi.Count, gi.ThresholdCount) + ) + if _, err := s.client.UploadFileContext(ctx, slack.FileUploadParameters{ + Reader: r, + Filename: filename, + Title: filename, + InitialComment: comment, + Channels: []string{s.channel}, + }); err != nil { + return fmt.Errorf("autopprof: failed to upload a file to Slack channel: %w", err) + } + return nil +}