Skip to content

Commit

Permalink
[FEED-8337] feat: add goroutine watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-shin0 committed Apr 22, 2024
1 parent be7f157 commit ecd1932
Show file tree
Hide file tree
Showing 27 changed files with 746 additions and 312 deletions.
105 changes: 94 additions & 11 deletions autopprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"bytes"
"context"
"fmt"
"github.com/daangn/autopprof/queryer"
"log"
"runtime/pprof"
"time"

"github.com/daangn/autopprof/report"
Expand All @@ -34,13 +36,22 @@ type autoPprof struct {
// Default: 0.75. (mean 75%)
memThreshold float64

// goroutineThreshold is the goroutine count threshold to trigger profile.
// If the goroutine count is over the threshold, the autopprof will
// report the goroutine profile.
// Default: 10000.
goroutineThreshold int

// minConsecutiveOverThreshold is the minimum consecutive
// number of over a threshold for reporting profile again.
// Default: 12.
minConsecutiveOverThreshold int

// queryer is used to query the quota and the cgroup stat.
queryer queryer
// cgroupQueryer is used to query the quota and the cgroup stat.
cgroupQueryer queryer.CgroupsQueryer

// runtimeQueryer is used to query the runtime stat.
runtimeQueryer queryer.RuntimeQueryer

// profiler is used to profile the cpu and the heap memory.
profiler profiler
Expand All @@ -53,8 +64,9 @@ type autoPprof struct {
reportBoth bool

// Flags to disable the profiling.
disableCPUProf bool
disableMemProf bool
disableCPUProf bool
disableMemProf bool
disableGoroutineProf bool

// stopC is the signal channel to stop the watch processes.
stopC chan struct{}
Expand All @@ -65,21 +77,26 @@ var globalAp *autoPprof

// Start configures and runs the autopprof process.
func Start(opt Option) error {
qryer, err := newQueryer()
cgroupQryer, err := queryer.NewCgroupQueryer()
if err != nil {
return err
}
if err := opt.validate(); err != nil {
return err
}

pprof.Profiles()
runtimeQryer, err := queryer.NewRuntimeQueryer()

profr := newDefaultProfiler(defaultCPUProfilingDuration)
ap := &autoPprof{
watchInterval: defaultWatchInterval,
cpuThreshold: defaultCPUThreshold,
memThreshold: defaultMemThreshold,
goroutineThreshold: defaultGoroutineThreshold,
minConsecutiveOverThreshold: defaultMinConsecutiveOverThreshold,
queryer: qryer,
cgroupQueryer: cgroupQryer,
runtimeQueryer: runtimeQryer,
profiler: profr,
reporter: opt.Reporter,
reportBoth: opt.ReportBoth,
Expand All @@ -93,6 +110,9 @@ func Start(opt Option) error {
if opt.MemThreshold != 0 {
ap.memThreshold = opt.MemThreshold
}
if opt.GoroutineThreshold != 0 {
ap.goroutineThreshold = opt.GoroutineThreshold
}
if !ap.disableCPUProf {
if err := ap.loadCPUQuota(); err != nil {
return err
Expand All @@ -112,7 +132,7 @@ func Stop() {
}

func (ap *autoPprof) loadCPUQuota() error {
err := ap.queryer.setCPUQuota()
err := ap.cgroupQueryer.SetCPUQuota()
if err == nil {
return nil
}
Expand All @@ -134,6 +154,7 @@ func (ap *autoPprof) loadCPUQuota() error {
func (ap *autoPprof) watch() {
go ap.watchCPUUsage()
go ap.watchMemUsage()
go ap.watchGoroutineCount()
<-ap.stopC
}

Expand All @@ -149,7 +170,7 @@ func (ap *autoPprof) watchCPUUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.cpuUsage()
usage, err := ap.cgroupQueryer.CpuUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -170,7 +191,7 @@ func (ap *autoPprof) watchCPUUsage() {
))
}
if ap.reportBoth && !ap.disableMemProf {
memUsage, err := ap.queryer.memUsage()
memUsage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -226,7 +247,7 @@ func (ap *autoPprof) watchMemUsage() {
for {
select {
case <-ticker.C:
usage, err := ap.queryer.memUsage()
usage, err := ap.cgroupQueryer.MemUsage()
if err != nil {
log.Println(err)
return
Expand All @@ -247,7 +268,7 @@ func (ap *autoPprof) watchMemUsage() {
))
}
if ap.reportBoth && !ap.disableCPUProf {
cpuUsage, err := ap.queryer.cpuUsage()
cpuUsage, err := ap.cgroupQueryer.CpuUsage()
if err != nil {
log.Println(err)
return
Expand Down Expand Up @@ -291,6 +312,68 @@ func (ap *autoPprof) reportHeapProfile(memUsage float64) error {
return nil
}

func (ap *autoPprof) watchGoroutineCount() {
if ap.disableGoroutineProf {
return
}

ticker := time.NewTicker(ap.watchInterval)
defer ticker.Stop()

var consecutiveOverThresholdCnt int
for {
select {
case <-ticker.C:
count := ap.runtimeQueryer.GoroutineCount()

if count < ap.goroutineThreshold {
// Reset the count if the goroutine count goes under the threshold.
consecutiveOverThresholdCnt = 0
continue
}

// If goroutine count remains high for a short period of time, no
// duplicate reports are sent.
// This is to prevent the autopprof from sending too many reports.
if consecutiveOverThresholdCnt == 0 {
if err := ap.reportGoroutineProfile(count); err != nil {
log.Println(fmt.Errorf(
"autopprof: failed to report the goroutine profile: %w", err,
))
}
}

consecutiveOverThresholdCnt++
if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold {
// Reset the count and ready to report the cpu profile again.
consecutiveOverThresholdCnt = 0
}
case <-ap.stopC:
return
}
}
}

func (ap *autoPprof) reportGoroutineProfile(goroutineCount int) error {
b, err := ap.profiler.profileGoroutine()
if err != nil {
return fmt.Errorf("autopprof: failed to profile the goroutine: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), reportTimeout)
defer cancel()

ci := report.GoroutineInfo{
ThresholdCount: ap.goroutineThreshold,
Count: goroutineCount,
}
bReader := bytes.NewReader(b)
if err := ap.reporter.ReportGoroutineProfile(ctx, bReader, ci); err != nil {
return err
}
return nil
}

func (ap *autoPprof) stop() {
close(ap.stopC)
}
Loading

0 comments on commit ecd1932

Please sign in to comment.