diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 1d398f9..4d2121a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -17,7 +17,7 @@ jobs: strategy: matrix: os-version: [ 22.04, 24.04 ] - go-version: [ 1.17, 1.18, 1.19 ] + go-version: [ '1.19', '1.20', '1.21', '1.22' ] runs-on: ubuntu-${{ matrix.os-version }} steps: - uses: actions/checkout@v1 @@ -31,7 +31,7 @@ jobs: strategy: matrix: os-version: [ 13, 14, 15 ] - go-version: [ 1.17, 1.18, 1.19 ] + go-version: [ '1.19', '1.20', '1.21', '1.22' ] runs-on: macos-${{ matrix.os-version }} steps: - uses: actions/checkout@v1 diff --git a/README.md b/README.md index 19af268..25019ad 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ e.g. Slack). ## Installation ```bash -go get -u github.com/daangn/autopprof +go get -u github.com/daangn/autopprof/v2 ``` ## Usage @@ -33,17 +33,17 @@ import ( "errors" "log" - "github.com/daangn/autopprof" - "github.com/daangn/autopprof/report" + "github.com/daangn/autopprof/v2" + "github.com/daangn/autopprof/v2/report" ) func main() { err := autopprof.Start(autopprof.Option{ + App: "YOUR_APP_NAME", CPUThreshold: 0.8, // Default: 0.75. MemThreshold: 0.8, // Default: 0.75. Reporter: report.NewSlackReporter( &report.SlackReporterOption{ - App: "YOUR_APP_NAME", Token: "YOUR_TOKEN_HERE", ChannelID: "REPORT_CHANNEL_ID", }, @@ -63,6 +63,161 @@ func main() { > You can create a custom reporter by implementing the `report.Reporter` interface. +## Custom metrics + +Beyond the built-in CPU / memory / goroutine watchers, you can register your own +`Metric` — useful for domain signals that only the owning struct knows about +(connection-pool usage, queue backlog, cache hit ratio, …). + +```go +type Pool struct { /* ... */ } + +func (p *Pool) Name() string { return "db_pool" } +func (p *Pool) Threshold() float64 { return 0.9 } +func (p *Pool) Interval() time.Duration { return 10 * time.Second } // 0 → global watchInterval +func (p *Pool) Query() (float64, error) { return p.Usage(), nil } +func (p *Pool) Collect(v float64) (autopprof.CollectResult, error) { + snap, err := p.Snapshot() + if err != nil { + return autopprof.CollectResult{}, err + } + return autopprof.CollectResult{ + Reader: bytes.NewReader(snap), + Filename: fmt.Sprintf("db_pool_%d.dump", time.Now().Unix()), + Comment: fmt.Sprintf(":rotating_light:[pool] %.2f ≥ 0.90", v), + }, nil +} + +// After autopprof.Start(...) — typically inside the struct's constructor: +_ = autopprof.Register(pool) +defer autopprof.Unregister("db_pool") +``` + +For one-off hooks you can skip the custom type entirely: + +```go +_ = autopprof.Register(autopprof.NewMetric( + "goroutine_blocked", 100, 5*time.Second, + func() (float64, error) { return float64(runtime.NumGoroutine()), nil }, + func(v float64) (autopprof.CollectResult, error) { /* ... */ }, +)) +``` + +The names `cpu`, `mem`, and `goroutine` are reserved for the built-in metrics. +User metrics do **not** participate in the built-in cascade. + +A built-in breach reports every other enabled built-in in addition to the +triggering one. Set `DisableCPUProf`, `DisableMemProf`, or +`DisableGoroutineProf` to opt a built-in out — it leaves the watcher and +the cascade in one step. + +## Migrating from v1 to v2 + +v2 unifies CPU / Mem / Goroutine / Custom under a single `Metric` interface +and narrows the `Reporter` surface. This section lists every change a v1 +caller needs to make. + +### 1. Update the module path + +```diff +- import "github.com/daangn/autopprof" ++ import "github.com/daangn/autopprof/v2" +``` + +Update `go.mod`: + +```bash +go get github.com/daangn/autopprof/v2 +``` + +### 2. `Option` changes + +| v1 | v2 | +|---|---| +| `ReportBoth bool` | **Removed.** Cascade is always on for enabled built-ins. | +| `ReportAll bool` | **Removed.** Cascade is always on for enabled built-ins. | +| *(n/a)* | `App string` — the `""` segment of built-in filenames. Defaults to `"autopprof"` when empty. | +| *(n/a)* | `Metrics []Metric` — user-defined metrics to register at `Start`. | + +All other fields (`CPUThreshold`, `MemThreshold`, `GoroutineThreshold`, +`Disable*Prof`, `Reporter`) are unchanged. Disable individual built-ins +via `Disable*Prof` — they're excluded from the cascade as well. + +```diff + autopprof.Start(autopprof.Option{ ++ App: "YOUR_APP_NAME", + CPUThreshold: 0.8, +- ReportBoth: true, + Reporter: myReporter, + }) +``` + +### 3. `SlackReporterOption` changes + +```diff + report.NewSlackReporter(&report.SlackReporterOption{ +- App: "YOUR_APP_NAME", // moved to Option.App + Token: "YOUR_TOKEN_HERE", +- Channel: "old-channel-name", // removed; Slack API dropped channel-name uploads + ChannelID: "REPORT_CHANNEL_ID", + }) +``` + +### 4. `Reporter` interface (4 methods → 1) + +```diff +-type Reporter interface { +- ReportCPUProfile(ctx context.Context, r io.Reader, ci CPUInfo) error +- ReportHeapProfile(ctx context.Context, r io.Reader, mi MemInfo) error +- ReportGoroutineProfile(ctx context.Context, r io.Reader, gi GoroutineInfo) error +-} ++type Reporter interface { ++ Report(ctx context.Context, r io.Reader, info ReportInfo) error ++} ++ ++type ReportInfo struct { ++ MetricName string // "cpu", "mem", "goroutine", or user-defined name ++ Filename string ++ Comment string ++ Value float64 ++ Threshold float64 ++} +``` + +Custom `Reporter` implementations should route on `info.MetricName`: + +```go +func (r *MyReporter) Report(ctx context.Context, reader io.Reader, info report.ReportInfo) error { + switch info.MetricName { + case "cpu": + return r.sendCPU(ctx, reader, info.Value*100, info.Threshold*100) + case "mem": + return r.sendMem(ctx, reader, info.Value*100, info.Threshold*100) + case "goroutine": + return r.sendGoroutine(ctx, reader, int(info.Value), int(info.Threshold)) + default: + return r.sendCustom(ctx, reader, info) + } +} +``` + +Removed types from the `report` package: `CPUInfo`, `MemInfo`, `GoroutineInfo`, +`CPUProfileFilenameFmt`, `HeapProfileFilenameFmt`, `GoroutineProfileFilenameFmt`. + +### 5. Bug fixes carried in v2 + +- `Option.DisableGoroutineProf` was silently ignored in v1 (the value + wasn't assigned to the internal struct at `Start` time). It now takes + effect correctly, which may change observed behavior for callers + relying on the flag. +- Cascade (the v1 `ReportAll: true` behavior) is now unconditional for + enabled built-ins; use `Disable*Prof` to opt specific metrics out. + +### 6. New: custom metrics + +v2 lets you register your own `Metric`. See the **Custom metrics** section +above — this is the main reason to migrate. + ## Benchmark Benchmark the overhead of watching the CPU and memory utilization. The overhead is very @@ -76,12 +231,16 @@ small, so we don't have to worry about the performance degradation. > ``` -BenchmarkLightJob-5 49444164 245.6 ns/op 0 B/op 0 allocs/op -BenchmarkLightJobWithWatchCPUUsage-5 48884026 250.1 ns/op 0 B/op 0 allocs/op -BenchmarkLightJobWithWatchMemUsage-5 49036617 246.3 ns/op 0 B/op 0 allocs/op -BenchmarkHeavyJob-5 59010 203759 ns/op 0 B/op 0 allocs/op -BenchmarkHeavyJobWithWatchCPUUsage-5 58915 204054 ns/op 2 B/op 0 allocs/op -BenchmarkHeavyJobWithWatchMemUsage-5 58850 204764 ns/op 2 B/op 0 allocs/op +BenchmarkLightJob-12 98078731 134.5 ns/op 0 B/op 0 allocs/op +BenchmarkLightJobWithWatchCPUUsage-12 92799849 133.0 ns/op 0 B/op 0 allocs/op +BenchmarkLightJobWithWatchMemUsage-12 96778594 128.3 ns/op 0 B/op 0 allocs/op +BenchmarkHeavyJob-12 105848 106606 ns/op 0 B/op 0 allocs/op +BenchmarkHeavyJobWithWatchCPUUsage-12 113047 112734 ns/op 1 B/op 0 allocs/op +BenchmarkHeavyJobWithWatchMemUsage-12 101102 133426 ns/op 1 B/op 0 allocs/op +BenchmarkLightAsyncJob-12 399696 29953 ns/op 7040 B/op 352 allocs/op +BenchmarkLightAsyncJobWithWatchGoroutineCount-12 347266 34259 ns/op 7040 B/op 352 allocs/op +BenchmarkHeavyAsyncJob-12 452 26689023 ns/op 6002064 B/op 300097 allocs/op +BenchmarkHeavyAsyncJobWithWatchGoroutineCount-12 414 27350318 ns/op 5973015 B/op 298647 allocs/op ``` ## License diff --git a/autopprof.go b/autopprof.go index 87b3a70..3d708f5 100644 --- a/autopprof.go +++ b/autopprof.go @@ -4,88 +4,78 @@ package autopprof import ( - "bytes" "context" "fmt" "log" + "sync" "time" - "github.com/daangn/autopprof/queryer" - "github.com/daangn/autopprof/report" + "github.com/daangn/autopprof/v2/queryer" + "github.com/daangn/autopprof/v2/report" ) -const ( - reportTimeout = 5 * time.Second -) +const reportTimeout = 5 * time.Second type autoPprof struct { - // watchInterval is the interval to watch the resource usages. - // Default: 5s. - watchInterval time.Duration - - // cpuThreshold is the cpu usage threshold to trigger profile. - // If the cpu usage is over the threshold, the autopprof will - // report the cpu profile. - // Default: 0.75. (mean 75%) - cpuThreshold float64 - - // memThreshold is the memory usage threshold to trigger profile. - // If the memory usage is over the threshold, the autopprof will - // report the heap profile. - // Default: 0.75. (mean 75%) - memThreshold float64 - - // goroutineThreshold is the goroutine count threshold to trigger profile. - // If the goroutine count is over the threshold, the autopprof will - // report the goroutine profile. - // Default: 50000. - goroutineThreshold int - - // minConsecutiveOverThreshold is the minimum consecutive - // number of over a threshold for reporting profile again. - // Default: 12. + watchInterval time.Duration minConsecutiveOverThreshold int - // cgroupQueryer is used to query the quota and the cgroup stat. - cgroupQueryer queryer.CgroupsQueryer - - // runtimeQueryer is used to query the runtime stat. - runtimeQueryer queryer.RuntimeQueryer - - // profiler is used to profile the cpu and the heap memory. - profiler profiler - - // reporter is the reporter to send the profiling reports. reporter report.Reporter + app string - // deprecated: use reportAll instead. - // reportBoth sets whether to trigger reports for both CPU and memory when either threshold is exceeded. - // If some profiling is disabled, exclude it. - reportBoth bool - - // reportAll sets whether to trigger reports for all profiling types when any threshold is exceeded. - // If some profiling is disabled, exclude it. - reportAll bool - - // Flags to disable the profiling. disableCPUProf bool disableMemProf bool disableGoroutineProf bool - // stopC is the signal channel to stop the watch processes. - stopC chan struct{} + cgroupQueryer queryer.CgroupsQueryer + runtimeQueryer queryer.RuntimeQueryer + profiler profiler + + // cascadedRunners holds only the built-in metrics so cascadeBuiltIn + // can iterate them. Populated during Start and thereafter read-only — + // no mutex needed. + cascadedRunners map[string]*metricRunner + + // wg tracks every live watcher goroutine so Stop blocks until + // in-flight pprof work (CPU profiling runs up to ~10s) unwinds. + wg sync.WaitGroup + stopOnce sync.Once + stopC chan struct{} +} + +type metricRunner struct { + metric Metric + name string + threshold float64 + interval time.Duration } -// globalAp is the global autopprof instance. -var globalAp *autoPprof +// globalAp is the running instance, or nil before Start. Access is +// guarded by startOnce / stopOnce — Start and Stop each fire at most +// once per process. +var ( + globalAp *autoPprof + startOnce sync.Once + startErr error + stopOnce sync.Once +) -// Start configures and runs the autopprof process. +// Start configures and runs the autopprof process. It executes at +// most once per process — subsequent calls return the same error (or +// nil) as the first invocation. Safe to call concurrently; later +// callers block on the first one. func Start(opt Option) error { + startOnce.Do(func() { + startErr = start(opt) + }) + return startErr +} + +func start(opt Option) error { cgroupQryer, err := queryer.NewCgroupQueryer() if err != nil { return err } - runtimeQryer, err := queryer.NewRuntimeQueryer() if err != nil { return err @@ -94,62 +84,72 @@ func Start(opt Option) error { return err } + app := opt.App + if app == "" { + app = defaultApp + } profr := newDefaultProfiler(defaultCPUProfilingDuration) ap := &autoPprof{ watchInterval: defaultWatchInterval, - cpuThreshold: defaultCPUThreshold, - memThreshold: defaultMemThreshold, - goroutineThreshold: defaultGoroutineThreshold, minConsecutiveOverThreshold: defaultMinConsecutiveOverThreshold, - cgroupQueryer: cgroupQryer, - runtimeQueryer: runtimeQryer, - profiler: profr, reporter: opt.Reporter, - reportBoth: opt.ReportBoth, + app: app, disableCPUProf: opt.DisableCPUProf, disableMemProf: opt.DisableMemProf, + disableGoroutineProf: opt.DisableGoroutineProf, + cgroupQueryer: cgroupQryer, + runtimeQueryer: runtimeQryer, + profiler: profr, + cascadedRunners: make(map[string]*metricRunner), stopC: make(chan struct{}), } - if opt.CPUThreshold != 0 { - ap.cpuThreshold = opt.CPUThreshold - } - if opt.MemThreshold != 0 { - ap.memThreshold = opt.MemThreshold - } - if opt.GoroutineThreshold != 0 { - ap.goroutineThreshold = opt.GoroutineThreshold - } if !ap.disableCPUProf { if err := ap.loadCPUQuota(); err != nil { return err } } - - go ap.watch() + ap.registerBuiltinMetrics(opt) + for _, m := range opt.Metrics { + if err := ap.registerMetric(m); err != nil { + ap.stop() + return err + } + } globalAp = ap return nil } -// Stop stops the global autopprof process. +// Stop stops the global autopprof process. It executes at most once +// per process; subsequent calls are no-ops. Safe to call concurrently. func Stop() { - if globalAp != nil { + stopOnce.Do(func() { + if globalAp == nil { + return + } globalAp.stop() + globalAp = nil + }) +} + +// Register adds a user Metric to the running autopprof instance. The +// metric's watcher runs until Stop. +func Register(m Metric) error { + if globalAp == nil { + return ErrNotStarted } + return globalAp.registerMetric(m) } +// loadCPUQuota resolves the container CPU limit. If the cgroup quota +// isn't set we log and silently disable CPU profiling (matching v1). func (ap *autoPprof) loadCPUQuota() error { err := ap.cgroupQueryer.SetCPUQuota() if err == nil { return nil } - - // If memory profiling is disabled and CPU quota isn't set, - // returns an error immediately. if ap.disableMemProf { return err } - // If memory profiling is enabled, just logs the error and - // disables the cpu profiling. log.Println( "autopprof: disable the cpu profiling due to the CPU quota isn't set", ) @@ -157,159 +157,119 @@ func (ap *autoPprof) loadCPUQuota() error { return nil } -func (ap *autoPprof) watch() { - go ap.watchCPUUsage() - go ap.watchMemUsage() - go ap.watchGoroutineCount() - <-ap.stopC -} - -func (ap *autoPprof) watchCPUUsage() { - if ap.disableCPUProf { - return +func (ap *autoPprof) registerBuiltinMetrics(opt Option) { + cpuThreshold := defaultCPUThreshold + if opt.CPUThreshold != 0 { + cpuThreshold = opt.CPUThreshold } - - ticker := time.NewTicker(ap.watchInterval) - defer ticker.Stop() - - var consecutiveOverThresholdCnt int - for { - select { - case <-ticker.C: - usage, err := ap.cgroupQueryer.CPUUsage() - if err != nil { - log.Println(err) - return - } - if usage < ap.cpuThreshold { - // Reset the count if the cpu usage goes under the threshold. - consecutiveOverThresholdCnt = 0 - continue - } - - // If cpu utilization remains high for a short period of time, no - // duplicate reports are sent. - // This is to prevent the autopprof from sending too many reports. - if consecutiveOverThresholdCnt == 0 { - if err := ap.reportCPUProfile(usage); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the cpu profile: %w", err, - )) - } - - if (ap.reportBoth || ap.reportAll) && !ap.disableMemProf { - memUsage, err := ap.cgroupQueryer.MemUsage() - if err != nil { - log.Println(err) - return - } - if err := ap.reportHeapProfile(memUsage); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the heap profile: %w", err, - )) - } - } - - if ap.reportAll && !ap.disableGoroutineProf { - goroutineCount := ap.runtimeQueryer.GoroutineCount() - if err := ap.reportGoroutineProfile(goroutineCount); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the goroutine profile: %w", err, - )) - } - } - } - - consecutiveOverThresholdCnt++ - if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold { - // Reset the count and ready to report the cpu profile again. - consecutiveOverThresholdCnt = 0 - } - case <-ap.stopC: - return - } + memThreshold := defaultMemThreshold + if opt.MemThreshold != 0 { + memThreshold = opt.MemThreshold + } + goroutineThreshold := defaultGoroutineThreshold + if opt.GoroutineThreshold != 0 { + goroutineThreshold = opt.GoroutineThreshold } -} -func (ap *autoPprof) reportCPUProfile(cpuUsage float64) error { - b, err := ap.profiler.profileCPU() - if err != nil { - return fmt.Errorf("autopprof: failed to profile the cpu: %w", err) + if !ap.disableCPUProf { + ap.registerBuiltIn(&cpuMetric{ + app: ap.app, threshold: cpuThreshold, + cg: ap.cgroupQueryer, p: ap.profiler, + }) + } + if !ap.disableMemProf { + ap.registerBuiltIn(&memMetric{ + app: ap.app, threshold: memThreshold, + cg: ap.cgroupQueryer, p: ap.profiler, + }) } + if !ap.disableGoroutineProf { + ap.registerBuiltIn(&goroutineMetric{ + app: ap.app, threshold: goroutineThreshold, + rt: ap.runtimeQueryer, p: ap.profiler, + }) + } +} - ctx, cancel := context.WithTimeout(context.Background(), reportTimeout) - defer cancel() +func (ap *autoPprof) registerBuiltIn(m Metric) { + runner := newRunner(m, ap.watchInterval) + ap.cascadedRunners[runner.name] = runner + ap.wg.Add(1) + go func() { + defer ap.wg.Done() + ap.watchMetric(runner, true) + }() +} - ci := report.CPUInfo{ - ThresholdPercentage: ap.cpuThreshold * 100, - UsagePercentage: cpuUsage * 100, - } - bReader := bytes.NewReader(b) - if err := ap.reporter.ReportCPUProfile(ctx, bReader, ci); err != nil { +func (ap *autoPprof) registerMetric(m Metric) error { + if err := validateMetric(m); err != nil { return err } + select { + case <-ap.stopC: + return ErrNotStarted + default: + } + runner := newRunner(m, ap.watchInterval) + ap.wg.Add(1) + go func() { + defer ap.wg.Done() + ap.watchMetric(runner, false) + }() return nil } -func (ap *autoPprof) watchMemUsage() { - if ap.disableMemProf { - return +// newRunner caches Metric's meta values so the watch loop uses a +// stable name/threshold/interval even if the implementation mutates +// them later. +func newRunner(m Metric, globalInterval time.Duration) *metricRunner { + interval := m.Interval() + if interval == 0 { + interval = globalInterval } + return &metricRunner{ + metric: m, + name: m.Name(), + threshold: m.Threshold(), + interval: interval, + } +} - ticker := time.NewTicker(ap.watchInterval) +// watchMetric runs the unified watch loop. minConsecutiveOverThreshold +// debounces repeat fires: report on the first tick above threshold, +// suppress until the counter drops below threshold or wraps around. +func (ap *autoPprof) watchMetric(runner *metricRunner, isBuiltin bool) { + ticker := time.NewTicker(runner.interval) defer ticker.Stop() - var consecutiveOverThresholdCnt int + var cnt int for { select { case <-ticker.C: - usage, err := ap.cgroupQueryer.MemUsage() + value, err := runner.metric.Query() if err != nil { - log.Println(err) + log.Println(fmt.Errorf( + "autopprof: metric %q query failed: %w", runner.name, err, + )) return } - if usage < ap.memThreshold { - // Reset the count if the memory usage goes under the threshold. - consecutiveOverThresholdCnt = 0 + if value < runner.threshold { + cnt = 0 continue } - - // If memory utilization remains high for a short period of time, - // no duplicate reports are sent. - // This is to prevent the autopprof from sending too many reports. - if consecutiveOverThresholdCnt == 0 { - if err := ap.reportHeapProfile(usage); err != nil { + if cnt == 0 { + if err := ap.fireReport(runner, value); err != nil { log.Println(fmt.Errorf( - "autopprof: failed to report the heap profile: %w", err, + "autopprof: metric %q report failed: %w", runner.name, err, )) } - if (ap.reportBoth || ap.reportAll) && !ap.disableCPUProf { - cpuUsage, err := ap.cgroupQueryer.CPUUsage() - if err != nil { - log.Println(err) - return - } - if err := ap.reportCPUProfile(cpuUsage); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the cpu profile: %w", err, - )) - } - } - - if ap.reportAll && !ap.disableGoroutineProf { - goroutineCount := ap.runtimeQueryer.GoroutineCount() - if err := ap.reportGoroutineProfile(goroutineCount); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the goroutine profile: %w", err, - )) - } + if isBuiltin { + ap.cascadeBuiltIn(runner.name) } } - - consecutiveOverThresholdCnt++ - if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold { - // Reset the count and ready to report the heap profile again. - consecutiveOverThresholdCnt = 0 + cnt++ + if cnt >= ap.minConsecutiveOverThreshold { + cnt = 0 } case <-ap.stopC: return @@ -317,114 +277,64 @@ func (ap *autoPprof) watchMemUsage() { } } -func (ap *autoPprof) reportHeapProfile(memUsage float64) error { - b, err := ap.profiler.profileHeap() +func (ap *autoPprof) fireReport(runner *metricRunner, value float64) error { + result, err := runner.metric.Collect(value) if err != nil { - return fmt.Errorf("autopprof: failed to profile the heap: %w", err) - } - - ctx, cancel := context.WithTimeout(context.Background(), reportTimeout) - defer cancel() - - mi := report.MemInfo{ - ThresholdPercentage: ap.memThreshold * 100, - UsagePercentage: memUsage * 100, + return fmt.Errorf("collect: %w", err) } - bReader := bytes.NewReader(b) - if err := ap.reporter.ReportHeapProfile(ctx, bReader, mi); err != nil { - return err + if result.Reader == nil { + // Side-effect-only hook; nothing to ship. + return nil } - return nil -} -func (ap *autoPprof) watchGoroutineCount() { - if ap.disableGoroutineProf { - return + info := report.ReportInfo{ + MetricName: runner.name, + Filename: result.Filename, + Comment: result.Comment, + Value: value, + Threshold: runner.threshold, } - - ticker := time.NewTicker(ap.watchInterval) - defer ticker.Stop() - - var consecutiveOverThresholdCnt int - for { - select { - case <-ticker.C: - count := ap.runtimeQueryer.GoroutineCount() - - if count < ap.goroutineThreshold { - // Reset the count if the goroutine count goes under the threshold. - consecutiveOverThresholdCnt = 0 - continue - } - - // If goroutine count remains high for a short period of time, no - // duplicate reports are sent. - // This is to prevent the autopprof from sending too many reports. - if consecutiveOverThresholdCnt == 0 { - if err := ap.reportGoroutineProfile(count); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the goroutine profile: %w", err, - )) - } - - if ap.reportAll && !ap.disableCPUProf { - cpuUsage, err := ap.cgroupQueryer.CPUUsage() - if err != nil { - log.Println(err) - return - } - if err := ap.reportCPUProfile(cpuUsage); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the cpu profile: %w", err, - )) - } - } - - if ap.reportAll && !ap.disableMemProf { - memUsage, err := ap.cgroupQueryer.MemUsage() - if err != nil { - log.Println(err) - return - } - if err := ap.reportHeapProfile(memUsage); err != nil { - log.Println(fmt.Errorf( - "autopprof: failed to report the heap profile: %w", err, - )) - } - } - } - - consecutiveOverThresholdCnt++ - if consecutiveOverThresholdCnt >= ap.minConsecutiveOverThreshold { - // Reset the count and ready to report the goroutine profile again. - consecutiveOverThresholdCnt = 0 - } - case <-ap.stopC: - return - } + if info.Filename == "" { + info.Filename = defaultFilename(runner.name) } -} - -func (ap *autoPprof) reportGoroutineProfile(goroutineCount int) error { - b, err := ap.profiler.profileGoroutine() - if err != nil { - return fmt.Errorf("autopprof: failed to profile the goroutine: %w", err) + if info.Comment == "" { + info.Comment = defaultComment(runner.name, value, runner.threshold) } ctx, cancel := context.WithTimeout(context.Background(), reportTimeout) defer cancel() + return ap.reporter.Report(ctx, result.Reader, info) +} - gi := report.GoroutineInfo{ - ThresholdCount: ap.goroutineThreshold, - Count: goroutineCount, - } - bReader := bytes.NewReader(b) - if err := ap.reporter.ReportGoroutineProfile(ctx, bReader, gi); err != nil { - return err +// cascadeBuiltIn reports the other enabled built-in metrics whenever +// any built-in breaches. Custom metrics stay independent. +// cascadedRunners is read-only after Start, so no lock. +func (ap *autoPprof) cascadeBuiltIn(triggered string) { + for name, r := range ap.cascadedRunners { + if name == triggered { + continue + } + value, err := r.metric.Query() + if err != nil { + log.Println(fmt.Errorf( + "autopprof: cascade query %q: %w", r.name, err, + )) + continue + } + if err := ap.fireReport(r, value); err != nil { + log.Println(fmt.Errorf( + "autopprof: cascade report %q: %w", r.name, err, + )) + } } - return nil } +// stop signals every watcher and blocks until they exit. wg.Wait +// ensures Stop() doesn't return while pprof.StartCPUProfile is in +// flight. func (ap *autoPprof) stop() { - close(ap.stopC) + ap.stopOnce.Do(func() { + close(ap.stopC) + ap.wg.Wait() + }) } diff --git a/autopprof_test.go b/autopprof_test.go index 20b99ba..61d2223 100644 --- a/autopprof_test.go +++ b/autopprof_test.go @@ -4,1293 +4,673 @@ package autopprof import ( + "bytes" "context" "errors" + "fmt" "io" + "strings" "sync" + "sync/atomic" "testing" "time" - "github.com/daangn/autopprof/queryer" + "github.com/daangn/autopprof/v2/queryer" + "github.com/daangn/autopprof/v2/report" "github.com/golang/mock/gomock" - - "github.com/daangn/autopprof/report" ) -func TestStart(t *testing.T) { +// fakeMetric is the test stub for the Metric interface. Each field +// lets a test inject behavior without spinning up a real struct. +type fakeMetric struct { + nameVal string + thresholdVal float64 + intervalVal time.Duration + queryFn func() (float64, error) + collectFn func(v float64) (CollectResult, error) + queryCalls atomic.Int64 + collectCalls atomic.Int64 +} + +func (f *fakeMetric) Name() string { return f.nameVal } +func (f *fakeMetric) Threshold() float64 { return f.thresholdVal } +func (f *fakeMetric) Interval() time.Duration { return f.intervalVal } +func (f *fakeMetric) Query() (float64, error) { + f.queryCalls.Add(1) + if f.queryFn != nil { + return f.queryFn() + } + return 0, nil +} +func (f *fakeMetric) Collect(v float64) (CollectResult, error) { + f.collectCalls.Add(1) + if f.collectFn != nil { + return f.collectFn(v) + } + return CollectResult{}, nil +} + +// resetGlobal ensures test isolation — some tests Start() and don't +// Stop() cleanly; others test CAS behavior that needs the slot empty. +func resetGlobal() { + globalAp = nil + startOnce = sync.Once{} + startErr = nil + stopOnce = sync.Once{} +} + +// ------------------------------------------------------------------- +// Validation tests +// ------------------------------------------------------------------- + +func TestOption_validate(t *testing.T) { + validMetric := &fakeMetric{ + nameVal: "custom", + thresholdVal: 1, + queryFn: func() (float64, error) { return 0, nil }, + collectFn: func(float64) (CollectResult, error) { return CollectResult{}, nil }, + } + stub := report.NewMockReporter(gomock.NewController(t)) + testCases := []struct { name string opt Option want error }{ - { - name: "disable flags are all true", - opt: Option{ - DisableCPUProf: true, - DisableMemProf: true, - DisableGoroutineProf: true, - }, - want: ErrDisableAllProfiling, - }, - { - name: "invalid CPUThreshold value 1", - opt: Option{ - CPUThreshold: -0.5, - }, - want: ErrInvalidCPUThreshold, - }, - { - name: "invalid CPUThreshold value 2", - opt: Option{ - CPUThreshold: 2.5, - }, - want: ErrInvalidCPUThreshold, - }, - { - name: "invalid MemThreshold value 1", - opt: Option{ - MemThreshold: -0.5, - }, - want: ErrInvalidMemThreshold, - }, - { - name: "invalid MemThreshold value 2", - opt: Option{ - MemThreshold: 2.5, - }, - want: ErrInvalidMemThreshold, - }, - { - name: "invalid GoroutineThreshold value -1", - opt: Option{ - GoroutineThreshold: -1, - }, - want: ErrInvalidGoroutineThreshold, - }, - { - name: "when given reporter is nil", - opt: Option{ - Reporter: nil, - }, - want: ErrNilReporter, - }, - { - name: "valid option 1", - opt: Option{ - Reporter: report.NewSlackReporter( - &report.SlackReporterOption{ - App: "appname", - Token: "token", - ChannelID: "channel_id", - }, - ), - }, - want: nil, - }, - { - name: "valid option 2", - opt: Option{ - MemThreshold: 0.5, - Reporter: report.NewSlackReporter( - &report.SlackReporterOption{ - App: "appname", - Token: "token", - ChannelID: "channel_id", - }, - ), - }, - want: nil, - }, + {"disable all with no custom metrics", + Option{DisableCPUProf: true, DisableMemProf: true, DisableGoroutineProf: true, Reporter: stub}, + ErrDisableAllProfiling}, + {"disable all but one custom metric is allowed", + Option{DisableCPUProf: true, DisableMemProf: true, DisableGoroutineProf: true, Reporter: stub, Metrics: []Metric{validMetric}}, + nil}, + {"invalid CPUThreshold", + Option{CPUThreshold: -0.5, Reporter: stub}, + ErrInvalidCPUThreshold}, + {"invalid MemThreshold", + Option{MemThreshold: 1.5, Reporter: stub}, + ErrInvalidMemThreshold}, + {"invalid GoroutineThreshold", + Option{GoroutineThreshold: -1, Reporter: stub}, + ErrInvalidGoroutineThreshold}, + {"nil Reporter", + Option{CPUThreshold: 0.8}, + ErrNilReporter}, + {"nil Metric entry", + Option{Reporter: stub, Metrics: []Metric{nil}}, + ErrInvalidMetric}, + {"empty name", + Option{Reporter: stub, Metrics: []Metric{&fakeMetric{thresholdVal: 1, queryFn: func() (float64, error) { return 0, nil }, collectFn: func(float64) (CollectResult, error) { return CollectResult{}, nil }}}}, + ErrInvalidMetric}, + {"negative threshold", + Option{Reporter: stub, Metrics: []Metric{&fakeMetric{nameVal: "x", thresholdVal: -1}}}, + ErrInvalidMetric}, + {"negative interval", + Option{Reporter: stub, Metrics: []Metric{&fakeMetric{nameVal: "x", thresholdVal: 1, intervalVal: -time.Second}}}, + ErrInvalidMetric}, + {"valid custom metric", + Option{Reporter: stub, Metrics: []Metric{validMetric}}, + nil}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - t.Cleanup(func() { - if globalAp != nil { - globalAp.stop() - globalAp = nil - } - }) - if err := Start(tc.opt); !errors.Is(err, tc.want) { - t.Errorf("Start() = %v, want %v", err, tc.want) - } - if tc.want == nil && globalAp == nil { - t.Errorf("globalAp is nil, want non-nil value") + if err := tc.opt.validate(); !errors.Is(err, tc.want) { + t.Errorf("want %v, got %v", tc.want, err) } }) } } -func TestStop(t *testing.T) { - testCases := []struct { - name string - started bool - }{ - { - name: "stop before start", - started: false, - }, - { - name: "stop after start", - started: true, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - t.Cleanup(func() { - globalAp = nil - }) - if tc.started { - _ = Start(Option{ - MemThreshold: 0.5, - Reporter: report.NewSlackReporter( - &report.SlackReporterOption{ - App: "appname", - Token: "token", - ChannelID: "channel_id", - }, - ), - }) - } - Stop() // Expect no panic. - }) + +// ------------------------------------------------------------------- +// Built-in Metric: watch loop & Reporter routing +// ------------------------------------------------------------------- + +func newTestAp(t *testing.T, reporter report.Reporter) *autoPprof { + t.Helper() + return &autoPprof{ + watchInterval: 20 * time.Millisecond, + minConsecutiveOverThreshold: 3, + reporter: reporter, + cascadedRunners: make(map[string]*metricRunner), + stopC: make(chan struct{}), } } -func TestAutoPprof_loadCPUQuota(t *testing.T) { - testCases := []struct { - name string - newAp func() *autoPprof - wantDisableCPUProfFlag bool - wantErr error - }{ - { - name: "cpu quota is set", - newAp: func() *autoPprof { - ctrl := gomock.NewController(t) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - SetCPUQuota(). - Return(nil) // Means that the quota is set correctly. - - return &autoPprof{ - cgroupQueryer: mockQueryer, - disableCPUProf: false, - disableMemProf: false, - } - }, - wantDisableCPUProfFlag: false, - wantErr: nil, - }, - { - name: "cpu quota isn't set and memory profiling is enabled", - newAp: func() *autoPprof { - ctrl := gomock.NewController(t) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - SetCPUQuota(). - Return(queryer.ErrV2CPUQuotaUndefined) - - return &autoPprof{ - cgroupQueryer: mockQueryer, - disableCPUProf: false, - disableMemProf: false, - } - }, - wantDisableCPUProfFlag: true, - wantErr: nil, - }, - { - name: "cpu quota isn't set and memory profiling is disabled", - newAp: func() *autoPprof { - ctrl := gomock.NewController(t) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - SetCPUQuota(). - Return(queryer.ErrV2CPUQuotaUndefined) - - return &autoPprof{ - cgroupQueryer: mockQueryer, - disableCPUProf: false, - disableMemProf: true, - } - }, - wantDisableCPUProfFlag: false, - wantErr: queryer.ErrV2CPUQuotaUndefined, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ap := tc.newAp() - err := ap.loadCPUQuota() - if !errors.Is(err, tc.wantErr) { - t.Errorf("loadCPUQuota() = %v, want %v", err, tc.wantErr) - } - if ap.disableCPUProf != tc.wantDisableCPUProfFlag { - t.Errorf("disableCPUProf = %v, want %v", ap.disableCPUProf, tc.wantDisableCPUProfFlag) - } +func TestWatchMetric_builtinCPU_routesToReporter(t *testing.T) { + ctrl := gomock.NewController(t) + mockCG := queryer.NewMockCgroupsQueryer(ctrl) + mockCG.EXPECT().CPUUsage().AnyTimes().Return(0.9, nil) + mockProf := NewMockprofiler(ctrl) + mockProf.EXPECT().profileCPU().AnyTimes().Return([]byte("cpu-bytes"), nil) + + var gotInfo report.ReportInfo + var reported atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, r io.Reader, info report.ReportInfo) error { + gotInfo = info + reported.Add(1) + return nil }) + + ap := newTestAp(t, mockReporter) + ap.cgroupQueryer = mockCG + ap.profiler = mockProf + ap.app = "myapp" + ap.registerBuiltIn(&cpuMetric{app: ap.app, threshold: 0.75, cg: mockCG, p: mockProf}) + t.Cleanup(func() { ap.stop() }) + + waitFor(t, func() bool { return reported.Load() > 0 }, time.Second) + + if gotInfo.MetricName != "cpu" { + t.Errorf("MetricName = %q, want cpu", gotInfo.MetricName) + } + if gotInfo.Value != 0.9 || gotInfo.Threshold != 0.75 { + t.Errorf("Value=%v Threshold=%v", gotInfo.Value, gotInfo.Threshold) + } + if !strings.Contains(gotInfo.Filename, "samples.cpu") || !strings.Contains(gotInfo.Filename, "myapp") { + t.Errorf("Filename %q lacks expected segments", gotInfo.Filename) + } + if !strings.Contains(gotInfo.Comment, "[CPU]") { + t.Errorf("Comment %q lacks [CPU]", gotInfo.Comment) } } -func TestAutoPprof_watchCPUUsage(t *testing.T) { +func TestWatchMetric_builtinMem_routesToReporter(t *testing.T) { ctrl := gomock.NewController(t) + mockCG := queryer.NewMockCgroupsQueryer(ctrl) + mockCG.EXPECT().MemUsage().AnyTimes().Return(0.9, nil) + mockProf := NewMockprofiler(ctrl) + mockProf.EXPECT().profileHeap().AnyTimes().Return([]byte("heap-bytes"), nil) - var ( - profiled bool - reported bool - ) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - DoAndReturn( - func() (float64, error) { - return 0.6, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - DoAndReturn( - func() ([]byte, error) { - profiled = true - return []byte("prof"), nil - }, - ) - + var gotInfo report.ReportInfo + var reported atomic.Int32 mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), gomock.Any()). + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.CPUInfo) error { - reported = true - return nil - }, - ) - - ap := &autoPprof{ - disableMemProf: true, - watchInterval: 1 * time.Second, - cpuThreshold: 0.5, // 50%. - cgroupQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), - } + DoAndReturn(func(_ context.Context, r io.Reader, info report.ReportInfo) error { + gotInfo = info + reported.Add(1) + return nil + }) - go ap.watchCPUUsage() + ap := newTestAp(t, mockReporter) + ap.cgroupQueryer = mockCG + ap.profiler = mockProf + ap.registerBuiltIn(&memMetric{threshold: 0.75, cg: mockCG, p: mockProf}) t.Cleanup(func() { ap.stop() }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if !profiled { - t.Errorf("cpu usage is not profiled") + waitFor(t, func() bool { return reported.Load() > 0 }, time.Second) + + if gotInfo.MetricName != "mem" { + t.Errorf("MetricName = %q, want mem", gotInfo.MetricName) } - if !reported { - t.Errorf("cpu usage is not reported") + if !strings.Contains(gotInfo.Filename, "alloc_objects") { + t.Errorf("Filename %q lacks heap segments", gotInfo.Filename) } } -func TestAutoPprof_watchCPUUsage_consecutive(t *testing.T) { +func TestWatchMetric_builtinGoroutine_routesToReporter(t *testing.T) { ctrl := gomock.NewController(t) + mockRT := queryer.NewMockRuntimeQueryer(ctrl) + mockRT.EXPECT().GoroutineCount().AnyTimes().Return(200) + mockProf := NewMockprofiler(ctrl) + mockProf.EXPECT().profileGoroutine().AnyTimes().Return([]byte("g-bytes"), nil) - var ( - profiledCnt int - reportedCnt int - ) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - DoAndReturn( - func() (float64, error) { - return 0.6, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - DoAndReturn( - func() ([]byte, error) { - profiledCnt++ - return []byte("prof"), nil - }, - ) - + var gotInfo report.ReportInfo + var reported atomic.Int32 mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), gomock.Any()). + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.CPUInfo) error { - reportedCnt++ - return nil - }, - ) - - ap := &autoPprof{ - disableMemProf: true, - watchInterval: 1 * time.Second, - cpuThreshold: 0.5, // 50%. - minConsecutiveOverThreshold: 3, - cgroupQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), - } + DoAndReturn(func(_ context.Context, r io.Reader, info report.ReportInfo) error { + gotInfo = info + reported.Add(1) + return nil + }) - go ap.watchCPUUsage() + ap := newTestAp(t, mockReporter) + ap.runtimeQueryer = mockRT + ap.profiler = mockProf + ap.registerBuiltIn(&goroutineMetric{threshold: 100, rt: mockRT, p: mockProf}) t.Cleanup(func() { ap.stop() }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if profiledCnt != 1 { - t.Errorf("cpu usage is profiled %d times, want 1", profiledCnt) - } - if reportedCnt != 1 { - t.Errorf("cpu usage is reported %d times, want 1", reportedCnt) - } + waitFor(t, func() bool { return reported.Load() > 0 }, time.Second) - time.Sleep(1050 * time.Millisecond) - // 2nd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("cpu usage is profiled %d times, want 1", profiledCnt) + if gotInfo.MetricName != "goroutine" { + t.Errorf("MetricName = %q, want goroutine", gotInfo.MetricName) } - if reportedCnt != 1 { - t.Errorf("cpu usage is reported %d times, want 1", reportedCnt) + if gotInfo.Value != 200 || gotInfo.Threshold != 100 { + t.Errorf("Value=%v Threshold=%v", gotInfo.Value, gotInfo.Threshold) } +} + +// ------------------------------------------------------------------- +// Debounce (minConsecutiveOverThreshold) +// ------------------------------------------------------------------- - time.Sleep(1050 * time.Millisecond) - // 3rd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("cpu usage is profiled %d times, want 1", profiledCnt) +func TestWatchMetric_debounce(t *testing.T) { + ctrl := gomock.NewController(t) + var reported atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, _ io.Reader, _ report.ReportInfo) error { + reported.Add(1) + return nil + }) + + fm := &fakeMetric{ + nameVal: "dbn", + thresholdVal: 10, + intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 100, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: bytes.NewReader([]byte("x")), Filename: "f", Comment: "c"}, nil + }, } - if reportedCnt != 1 { - t.Errorf("cpu usage is reported %d times, want 1", reportedCnt) + ap := newTestAp(t, mockReporter) + ap.minConsecutiveOverThreshold = 3 + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) } + t.Cleanup(func() { ap.stop() }) - time.Sleep(1050 * time.Millisecond) - // 4th time. Now it should be profiled and reported. - if profiledCnt != 2 { - t.Errorf("cpu usage is profiled %d times, want 2", profiledCnt) - } - if reportedCnt != 2 { - t.Errorf("cpu usage is reported %d times, want 2", reportedCnt) + // Five ticks @20ms ≈ 100ms — first and fourth ticks should fire. + time.Sleep(170 * time.Millisecond) + got := reported.Load() + if got < 2 || got > 3 { + t.Errorf("expected 2-3 reports with debounce=3, got %d", got) } } -func TestAutoPprof_watchCPUUsage_reportAll(t *testing.T) { - type fields struct { - watchInterval time.Duration - cpuThreshold float64 - reportAll bool - disableMemProf bool - disableGoroutineProf bool - stopC chan struct{} - } - testCases := []struct { - name string - fields fields - mockFunc func(*queryer.MockCgroupsQueryer, *queryer.MockRuntimeQueryer, *Mockprofiler, *report.MockReporter) - }{ - { - name: "reportAll: true", - fields: fields{ - watchInterval: 1 * time.Second, - cpuThreshold: 0.5, // 50%. - reportAll: true, - disableMemProf: false, - disableGoroutineProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - Return([]byte("cpu_prof"), nil), - - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), report.CPUInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.2, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.2 * 100, - }). - AnyTimes(). - Return(nil), - - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 500, - Count: 200, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: true, disableMemProf: true", - fields: fields{ - watchInterval: 1 * time.Second, - cpuThreshold: 0.5, // 50%. - reportAll: true, - disableMemProf: true, - disableGoroutineProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - Return([]byte("cpu_prof"), nil), - - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), report.CPUInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 500, - Count: 200, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: false", - fields: fields{ - watchInterval: 1 * time.Second, - cpuThreshold: 0.5, // 50%. - reportAll: false, - disableMemProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - Return([]byte("cpu_prof"), nil), - - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), report.CPUInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - - mockCgroupsQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockRuntimeQueryer := queryer.NewMockRuntimeQueryer(ctrl) - mockProfiler := NewMockprofiler(ctrl) - mockReporter := report.NewMockReporter(ctrl) - - ap := &autoPprof{ - watchInterval: tc.fields.watchInterval, - cpuThreshold: tc.fields.cpuThreshold, - memThreshold: 0.5, // 50%. - goroutineThreshold: 500, - cgroupQueryer: mockCgroupsQueryer, - runtimeQueryer: mockRuntimeQueryer, - profiler: mockProfiler, - reporter: mockReporter, - reportBoth: tc.fields.reportAll, - disableMemProf: tc.fields.disableMemProf, - stopC: tc.fields.stopC, - } +// ------------------------------------------------------------------- +// Cascade +// ------------------------------------------------------------------- - tc.mockFunc(mockCgroupsQueryer, mockRuntimeQueryer, mockProfiler, mockReporter) +func TestCascadeBuiltIn(t *testing.T) { + ctrl := gomock.NewController(t) + mockCG := queryer.NewMockCgroupsQueryer(ctrl) + mockCG.EXPECT().CPUUsage().AnyTimes().Return(0.9, nil) + mockCG.EXPECT().MemUsage().AnyTimes().Return(0.1, nil) // below threshold + mockRT := queryer.NewMockRuntimeQueryer(ctrl) + mockRT.EXPECT().GoroutineCount().AnyTimes().Return(1) // below threshold + mockProf := NewMockprofiler(ctrl) + mockProf.EXPECT().profileCPU().AnyTimes().Return([]byte("c"), nil) + mockProf.EXPECT().profileHeap().AnyTimes().Return([]byte("h"), nil) + mockProf.EXPECT().profileGoroutine().AnyTimes().Return([]byte("g"), nil) + + var cpuCnt, memCnt, goCnt atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, _ io.Reader, info report.ReportInfo) error { + switch info.MetricName { + case "cpu": + cpuCnt.Add(1) + case "mem": + memCnt.Add(1) + case "goroutine": + goCnt.Add(1) + } + return nil + }) - go ap.watchCPUUsage() - defer ap.stop() + ap := newTestAp(t, mockReporter) + ap.cgroupQueryer = mockCG + ap.runtimeQueryer = mockRT + ap.profiler = mockProf + ap.minConsecutiveOverThreshold = 1000 + ap.registerBuiltIn(&cpuMetric{threshold: 0.5, cg: mockCG, p: mockProf}) + ap.registerBuiltIn(&memMetric{threshold: 0.5, cg: mockCG, p: mockProf}) + ap.registerBuiltIn(&goroutineMetric{threshold: 5, rt: mockRT, p: mockProf}) + t.Cleanup(func() { ap.stop() }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - }) + // Only CPU is over threshold; cascade should fire Mem and Goroutine too. + waitFor(t, func() bool { + return cpuCnt.Load() > 0 && memCnt.Load() > 0 && goCnt.Load() > 0 + }, 2*time.Second) + if cpuCnt.Load() == 0 || memCnt.Load() == 0 || goCnt.Load() == 0 { + t.Errorf("cascade should fire all built-ins cpu=%d mem=%d goroutine=%d", + cpuCnt.Load(), memCnt.Load(), goCnt.Load()) } } -func TestAutoPprof_watchMemUsage(t *testing.T) { - ctrl := gomock.NewController(t) - - var ( - profiled bool - reported bool - ) +// ------------------------------------------------------------------- +// User metric: trigger, independence, interval, nil reader, defaults +// ------------------------------------------------------------------- - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - MemUsage(). +func TestUserMetric_trigger(t *testing.T) { + ctrl := gomock.NewController(t) + var infoSeen report.ReportInfo + var reported atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn( - func() (float64, error) { - return 0.3, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileHeap(). - DoAndReturn( - func() ([]byte, error) { - profiled = true - return []byte("prof"), nil - }, - ) + DoAndReturn(func(_ context.Context, _ io.Reader, info report.ReportInfo) error { + infoSeen = info + reported.Add(1) + return nil + }) - mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.MemInfo) error { - reported = true - return nil - }, - ) - - ap := &autoPprof{ - disableCPUProf: true, - watchInterval: 1 * time.Second, - memThreshold: 0.2, // 20%. - cgroupQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), + fm := &fakeMetric{ + nameVal: "mycustom", + thresholdVal: 10, + intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 42, nil }, + collectFn: func(v float64) (CollectResult, error) { + return CollectResult{ + Reader: bytes.NewReader([]byte("custom-bytes")), + Filename: "user.dump", + Comment: "user comment", + }, nil + }, + } + ap := newTestAp(t, mockReporter) + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) } - - go ap.watchMemUsage() t.Cleanup(func() { ap.stop() }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if !profiled { - t.Errorf("mem usage is not profiled") - } - if !reported { - t.Errorf("mem usage is not reported") + waitFor(t, func() bool { return reported.Load() > 0 }, time.Second) + + if infoSeen.MetricName != "mycustom" || infoSeen.Filename != "user.dump" || + infoSeen.Comment != "user comment" || infoSeen.Value != 42 || + infoSeen.Threshold != 10 { + t.Errorf("unexpected info: %+v", infoSeen) } } -func TestAutoPprof_watchMemUsage_consecutive(t *testing.T) { +func TestUserMetric_independent_noCascade(t *testing.T) { ctrl := gomock.NewController(t) - - var ( - profiledCnt int - reportedCnt int - ) - - mockQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - DoAndReturn( - func() (float64, error) { - return 0.3, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - DoAndReturn( - func() ([]byte, error) { - profiledCnt++ - return []byte("prof"), nil - }, - ) - + mockCG := queryer.NewMockCgroupsQueryer(ctrl) + mockCG.EXPECT().CPUUsage().AnyTimes().Return(0.1, nil) + mockRT := queryer.NewMockRuntimeQueryer(ctrl) + mockRT.EXPECT().GoroutineCount().AnyTimes().Return(1) + mockProf := NewMockprofiler(ctrl) + + var names []string + var mu sync.Mutex mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), gomock.Any()). + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). AnyTimes(). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.MemInfo) error { - reportedCnt++ - return nil - }, - ) - - ap := &autoPprof{ - disableCPUProf: true, - watchInterval: 1 * time.Second, - memThreshold: 0.2, // 20%. - minConsecutiveOverThreshold: 3, - cgroupQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), - } - - go ap.watchMemUsage() - t.Cleanup(func() { ap.stop() }) + DoAndReturn(func(_ context.Context, _ io.Reader, info report.ReportInfo) error { + mu.Lock() + names = append(names, info.MetricName) + mu.Unlock() + return nil + }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if profiledCnt != 1 { - t.Errorf("mem usage is profiled %d times, want 1", profiledCnt) + ap := newTestAp(t, mockReporter) + ap.cgroupQueryer = mockCG + ap.runtimeQueryer = mockRT + ap.profiler = mockProf + // Built-in cascade must not pick up user metrics. + ap.registerBuiltIn(&cpuMetric{threshold: 0.5, cg: mockCG, p: mockProf}) + + fm := &fakeMetric{ + nameVal: "customonly", thresholdVal: 10, intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 100, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: bytes.NewReader([]byte("b"))}, nil + }, } - if reportedCnt != 1 { - t.Errorf("mem usage is reported %d times, want 1", reportedCnt) + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) } + t.Cleanup(func() { ap.stop() }) - time.Sleep(1050 * time.Millisecond) - // 2nd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("mem usage is profiled %d times, want 1", profiledCnt) - } - if reportedCnt != 1 { - t.Errorf("mem usage is reported %d times, want 1", reportedCnt) + waitFor(t, func() bool { + mu.Lock() + defer mu.Unlock() + for _, n := range names { + if n == "customonly" { + return true + } + } + return false + }, time.Second) + + mu.Lock() + defer mu.Unlock() + for _, n := range names { + if n == "cpu" { + t.Errorf("custom trigger caused cpu cascade: %v", names) + return + } } +} - time.Sleep(1050 * time.Millisecond) - // 3rd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("mem usage is profiled %d times, want 1", profiledCnt) +func TestUserMetric_perMetricInterval(t *testing.T) { + ctrl := gomock.NewController(t) + var reported atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, _ io.Reader, _ report.ReportInfo) error { + reported.Add(1) + return nil + }) + + fm := &fakeMetric{ + nameVal: "fast", thresholdVal: 1, + intervalVal: 10 * time.Millisecond, // much faster than global + queryFn: func() (float64, error) { return 10, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: bytes.NewReader([]byte("b"))}, nil + }, } - if reportedCnt != 1 { - t.Errorf("mem usage is reported %d times, want 1", reportedCnt) + ap := newTestAp(t, mockReporter) + ap.watchInterval = 10 * time.Second // global should be ignored + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) } + t.Cleanup(func() { ap.stop() }) + + waitFor(t, func() bool { return reported.Load() > 0 }, 500*time.Millisecond) +} - time.Sleep(1050 * time.Millisecond) - // 4th time. Now it should be profiled and reported. - if profiledCnt != 2 { - t.Errorf("mem usage is profiled %d times, want 2", profiledCnt) +func TestUserMetric_nilReaderSkipsReporter(t *testing.T) { + ctrl := gomock.NewController(t) + mockReporter := report.NewMockReporter(ctrl) + // If Report is ever called, the test fails. + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) + + fm := &fakeMetric{ + nameVal: "noreport", thresholdVal: 1, intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 10, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: nil}, nil + }, } - if reportedCnt != 2 { - t.Errorf("mem usage is reported %d times, want 2", reportedCnt) + ap := newTestAp(t, mockReporter) + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) } + t.Cleanup(func() { ap.stop() }) + time.Sleep(80 * time.Millisecond) + // Test passes if the mock's Times(0) expectation is satisfied by gomock. } -func TestAutoPprof_watchMemUsage_reportAll(t *testing.T) { - type fields struct { - watchInterval time.Duration - memThreshold float64 - reportAll bool - disableCPUProf bool - disableGoroutineProf bool - stopC chan struct{} - } - testCases := []struct { - name string - fields fields - mockFunc func(*queryer.MockCgroupsQueryer, *queryer.MockRuntimeQueryer, *Mockprofiler, *report.MockReporter) - }{ - { - name: "reportAll: true", - fields: fields{ - watchInterval: 1 * time.Second, - memThreshold: 0.5, // 50%. - reportAll: true, - disableCPUProf: false, - disableGoroutineProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - - mockCgroupsQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - Return(0.2, nil), - - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - Return([]byte("cpu_prof"), nil), - - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), report.CPUInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.2 * 100, - }). - AnyTimes(). - Return(nil), - - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 500, - Count: 200, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: true, disableCPUProf: true", - fields: fields{ - watchInterval: 1 * time.Second, - memThreshold: 0.5, // 50%. - reportAll: true, - disableCPUProf: true, - disableGoroutineProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 500, - Count: 200, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: false", - fields: fields{ - watchInterval: 1 * time.Second, - memThreshold: 0.5, // 50%. - reportAll: false, - disableCPUProf: false, - disableGoroutineProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.6, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.6 * 100, - }). - AnyTimes(). - Return(nil), - ) - }, +func TestUserMetric_defaultFilenameComment(t *testing.T) { + ctrl := gomock.NewController(t) + var info report.ReportInfo + var done atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, _ io.Reader, i report.ReportInfo) error { + info = i + done.Add(1) + return nil + }) + + fm := &fakeMetric{ + nameVal: "defaults", thresholdVal: 1, intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 10, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: bytes.NewReader([]byte("x"))}, nil }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - - mockCgroupsQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockRuntimeQueryer := queryer.NewMockRuntimeQueryer(ctrl) - mockProfiler := NewMockprofiler(ctrl) - mockReporter := report.NewMockReporter(ctrl) - - ap := &autoPprof{ - watchInterval: tc.fields.watchInterval, - cpuThreshold: 0.5, // 50%. - memThreshold: tc.fields.memThreshold, - goroutineThreshold: 500, - cgroupQueryer: mockCgroupsQueryer, - runtimeQueryer: mockRuntimeQueryer, - profiler: mockProfiler, - reporter: mockReporter, - reportAll: tc.fields.reportAll, - disableCPUProf: tc.fields.disableCPUProf, - stopC: tc.fields.stopC, - } + ap := newTestAp(t, mockReporter) + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) + } + t.Cleanup(func() { ap.stop() }) - tc.mockFunc(mockCgroupsQueryer, mockRuntimeQueryer, mockProfiler, mockReporter) + waitFor(t, func() bool { return done.Load() > 0 }, time.Second) + if !strings.Contains(info.Filename, "defaults.") || !strings.Contains(info.Filename, ".bin") { + t.Errorf("default Filename=%q not generated", info.Filename) + } + if !strings.Contains(info.Comment, "[defaults]") { + t.Errorf("default Comment=%q not generated", info.Comment) + } +} - go ap.watchMemUsage() - defer ap.stop() +// ------------------------------------------------------------------- +// Register lifecycle +// ------------------------------------------------------------------- - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - }) +func TestRegister_errNotStarted(t *testing.T) { + resetGlobal() + m := &fakeMetric{nameVal: "x", thresholdVal: 1, + queryFn: func() (float64, error) { return 0, nil }} + if err := Register(m); !errors.Is(err, ErrNotStarted) { + t.Errorf("want ErrNotStarted, got %v", err) } } -func TestAutoPprof_watchGoroutineCount(t *testing.T) { +func TestRegister_stopsOnAutoPprofStop(t *testing.T) { ctrl := gomock.NewController(t) + var after atomic.Int32 + mockReporter := report.NewMockReporter(ctrl) + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()). + AnyTimes(). + DoAndReturn(func(_ context.Context, _ io.Reader, _ report.ReportInfo) error { + after.Add(1) + return nil + }) - var ( - profiled bool - reported bool - ) + fm := &fakeMetric{nameVal: "gone", thresholdVal: 1, intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 10, nil }, + collectFn: func(float64) (CollectResult, error) { + return CollectResult{Reader: bytes.NewReader([]byte("x"))}, nil + }} + ap := newTestAp(t, mockReporter) + if err := ap.registerMetric(fm); err != nil { + t.Fatal(err) + } + waitFor(t, func() bool { return after.Load() > 0 }, time.Second) - mockQueryer := queryer.NewMockRuntimeQueryer(ctrl) - mockQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - DoAndReturn( - func() (int, error) { - return 200, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileGoroutine(). - DoAndReturn( - func() ([]byte, error) { - profiled = true - return []byte("prof"), nil - }, - ) + ap.stop() + snap := after.Load() + time.Sleep(100 * time.Millisecond) + if delta := after.Load() - snap; delta != 0 { + t.Errorf("after Stop, got %d more reports", delta) + } +} + +// ------------------------------------------------------------------- +// Query error terminates the watcher +// ------------------------------------------------------------------- +func TestWatchMetric_queryErrorExitsWatcher(t *testing.T) { + ctrl := gomock.NewController(t) mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.GoroutineInfo) error { - reported = true - return nil - }, - ) - - ap := &autoPprof{ - disableCPUProf: true, - disableMemProf: true, - watchInterval: 1 * time.Second, - goroutineThreshold: 100, - runtimeQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), - } + // Reporter must never be called; Query errored before any fire. + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()).Times(0) - go ap.watchGoroutineCount() - t.Cleanup(func() { ap.stop() }) + ap := newTestAp(t, mockReporter) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if !profiled { - t.Errorf("goroutine count is not profiled") + broken := &fakeMetric{nameVal: "boom", thresholdVal: 1, intervalVal: 20 * time.Millisecond, + queryFn: func() (float64, error) { return 0, errors.New("boom") }, + collectFn: func(float64) (CollectResult, error) { return CollectResult{}, nil }} + if err := ap.registerMetric(broken); err != nil { + t.Fatal(err) } - if !reported { - t.Errorf("goroutine count is not reported") + + // Give the ticker one or two fires then Stop should return promptly, + // proving the watcher exited on its own. + time.Sleep(60 * time.Millisecond) + ap.stop() + if n := broken.queryCalls.Load(); n < 1 { + t.Errorf("expected at least one Query call, got %d", n) } } -func TestAutoPprof_watchGoroutineCount_consecutive(t *testing.T) { - ctrl := gomock.NewController(t) +// ------------------------------------------------------------------- +// Stop idempotency +// ------------------------------------------------------------------- - var ( - profiledCnt int - reportedCnt int - ) +func TestStop_idempotent(t *testing.T) { + ctrl := gomock.NewController(t) + mockReporter := report.NewMockReporter(ctrl) + ap := newTestAp(t, mockReporter) + ap.stop() + ap.stop() // must not panic (sync.Once) +} - mockQueryer := queryer.NewMockRuntimeQueryer(ctrl) - mockQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - DoAndReturn( - func() (int, error) { - return 200, nil - }, - ) - - mockProfiler := NewMockprofiler(ctrl) - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - DoAndReturn( - func() ([]byte, error) { - profiledCnt++ - return []byte("prof"), nil - }, - ) +// ------------------------------------------------------------------- +// Concurrency: Register under -race +// ------------------------------------------------------------------- +func TestRegister_concurrent(t *testing.T) { + ctrl := gomock.NewController(t) mockReporter := report.NewMockReporter(ctrl) - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), gomock.Any()). - AnyTimes(). - DoAndReturn( - func(_ context.Context, _ io.Reader, _ report.GoroutineInfo) error { - reportedCnt++ - return nil - }, - ) - - ap := &autoPprof{ - disableCPUProf: true, - disableMemProf: true, - watchInterval: 1 * time.Second, - goroutineThreshold: 100, - minConsecutiveOverThreshold: 3, - runtimeQueryer: mockQueryer, - profiler: mockProfiler, - reporter: mockReporter, - stopC: make(chan struct{}), - } + mockReporter.EXPECT().Report(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() - go ap.watchGoroutineCount() + ap := newTestAp(t, mockReporter) t.Cleanup(func() { ap.stop() }) - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - if profiledCnt != 1 { - t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) - } - if reportedCnt != 1 { - t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) - } - - time.Sleep(1050 * time.Millisecond) - // 2nd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) - } - if reportedCnt != 1 { - t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) + var wg sync.WaitGroup + for i := 0; i < 8; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + name := fmt.Sprintf("m%d", id) + m := &fakeMetric{nameVal: name, thresholdVal: 1, intervalVal: 10 * time.Millisecond, + queryFn: func() (float64, error) { return 0, nil }, + collectFn: func(float64) (CollectResult, error) { return CollectResult{}, nil }} + _ = ap.registerMetric(m) + }(i) } + wg.Wait() +} - time.Sleep(1050 * time.Millisecond) - // 3rd time. It shouldn't be profiled and reported. - if profiledCnt != 1 { - t.Errorf("goroutine count is profiled %d times, want 1", profiledCnt) - } - if reportedCnt != 1 { - t.Errorf("goroutine count is reported %d times, want 1", reportedCnt) - } +// ------------------------------------------------------------------- +// NewMetric nil-function defense +// ------------------------------------------------------------------- - time.Sleep(1050 * time.Millisecond) - // 4th time. Now it should be profiled and reported. - if profiledCnt != 2 { - t.Errorf("goroutine count is profiled %d times, want 2", profiledCnt) +func TestNewMetric_nilQueryCollect(t *testing.T) { + m := NewMetric("x", 0, 0, nil, nil) + if _, err := m.Query(); !errors.Is(err, ErrInvalidMetric) { + t.Errorf("Query with nil fn should be ErrInvalidMetric, got %v", err) } - if reportedCnt != 2 { - t.Errorf("goroutine count is reported %d times, want 2", reportedCnt) + if _, err := m.Collect(0); !errors.Is(err, ErrInvalidMetric) { + t.Errorf("Collect with nil fn should be ErrInvalidMetric, got %v", err) } } -func TestAutoPprof_watchGoroutineCount_reportAll(t *testing.T) { - type fields struct { - watchInterval time.Duration - goroutineThreshold int - reportAll bool - disableCPUProf bool - disableMemProf bool - stopC chan struct{} - } - testCases := []struct { - name string - fields fields - mockFunc func(*queryer.MockCgroupsQueryer, *queryer.MockRuntimeQueryer, *Mockprofiler, *report.MockReporter) - }{ - { - name: "reportAll: true", - fields: fields{ - watchInterval: 1 * time.Second, - goroutineThreshold: 100, - reportAll: true, - disableCPUProf: false, - disableMemProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 100, - Count: 200, - }). - AnyTimes(). - Return(nil), - - mockCgroupsQueryer.EXPECT(). - CPUUsage(). - AnyTimes(). - Return(0.2, nil), - - mockProfiler.EXPECT(). - profileCPU(). - AnyTimes(). - Return([]byte("cpu_prof"), nil), - - mockReporter.EXPECT(). - ReportCPUProfile(gomock.Any(), gomock.Any(), report.CPUInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.2 * 100, - }). - AnyTimes(). - Return(nil), - - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.2, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.2 * 100, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: true, disableCPUProf: true", - fields: fields{ - watchInterval: 1 * time.Second, - goroutineThreshold: 100, - reportAll: true, - disableCPUProf: true, - disableMemProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 100, - Count: 200, - }). - AnyTimes(). - Return(nil), - - mockCgroupsQueryer.EXPECT(). - MemUsage(). - AnyTimes(). - Return(0.2, nil), - - mockProfiler.EXPECT(). - profileHeap(). - AnyTimes(). - Return([]byte("mem_prof"), nil), - - mockReporter.EXPECT(). - ReportHeapProfile(gomock.Any(), gomock.Any(), report.MemInfo{ - ThresholdPercentage: 0.5 * 100, - UsagePercentage: 0.2 * 100, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - { - name: "reportAll: false", - fields: fields{ - watchInterval: 1 * time.Second, - goroutineThreshold: 100, - reportAll: false, - disableCPUProf: false, - disableMemProf: false, - stopC: make(chan struct{}), - }, - mockFunc: func(mockCgroupsQueryer *queryer.MockCgroupsQueryer, mockRuntimeQueryer *queryer.MockRuntimeQueryer, mockProfiler *Mockprofiler, mockReporter *report.MockReporter) { - gomock.InOrder( - mockRuntimeQueryer.EXPECT(). - GoroutineCount(). - AnyTimes(). - Return(200), - - mockProfiler.EXPECT(). - profileGoroutine(). - AnyTimes(). - Return([]byte("goroutine_prof"), nil), - - mockReporter.EXPECT(). - ReportGoroutineProfile(gomock.Any(), gomock.Any(), report.GoroutineInfo{ - ThresholdCount: 100, - Count: 200, - }). - AnyTimes(). - Return(nil), - ) - }, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - ctrl := gomock.NewController(t) - - mockCgroupsQueryer := queryer.NewMockCgroupsQueryer(ctrl) - mockRuntimeQueryer := queryer.NewMockRuntimeQueryer(ctrl) - mockProfiler := NewMockprofiler(ctrl) - mockReporter := report.NewMockReporter(ctrl) - - ap := &autoPprof{ - watchInterval: tc.fields.watchInterval, - cpuThreshold: 0.5, // 50%. - memThreshold: 0.5, // 50%. - goroutineThreshold: tc.fields.goroutineThreshold, - cgroupQueryer: mockCgroupsQueryer, - runtimeQueryer: mockRuntimeQueryer, - profiler: mockProfiler, - reporter: mockReporter, - reportAll: tc.fields.reportAll, - disableCPUProf: tc.fields.disableCPUProf, - stopC: tc.fields.stopC, - } - - tc.mockFunc(mockCgroupsQueryer, mockRuntimeQueryer, mockProfiler, mockReporter) +// ------------------------------------------------------------------- +// Helpers +// ------------------------------------------------------------------- - go ap.watchGoroutineCount() - defer ap.stop() - - // Wait for profiling and reporting. - time.Sleep(1050 * time.Millisecond) - }) +func waitFor(t *testing.T, cond func() bool, total time.Duration) { + t.Helper() + deadline := time.Now().Add(total) + for time.Now().Before(deadline) { + if cond() { + return + } + time.Sleep(5 * time.Millisecond) } + t.Fatalf("condition not met within %v", total) } +// ------------------------------------------------------------------- +// Benchmarks — measure overhead of watching vs. a bare workload. +// ------------------------------------------------------------------- + func fib(n int) int64 { if n <= 1 { return int64(n) @@ -1298,6 +678,26 @@ func fib(n int) int64 { return fib(n-1) + fib(n-2) } +func fibAsync(n int) int64 { + if n <= 1 { + return int64(n) + } + var ( + v int64 + m sync.Mutex + wg sync.WaitGroup + ) + wg.Add(1) + go func() { + defer wg.Done() + m.Lock() + defer m.Unlock() + v = fibAsync(n-1) + fibAsync(n-2) + }() + wg.Wait() + return v +} + func BenchmarkLightJob(b *testing.B) { for i := 0; i < b.N; i++ { fib(10) @@ -1305,10 +705,9 @@ func BenchmarkLightJob(b *testing.B) { } func BenchmarkLightJobWithWatchCPUUsage(b *testing.B) { - var ( - qryer, _ = queryer.NewCgroupQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewCgroupQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: @@ -1320,10 +719,9 @@ func BenchmarkLightJobWithWatchCPUUsage(b *testing.B) { } func BenchmarkLightJobWithWatchMemUsage(b *testing.B) { - var ( - qryer, _ = queryer.NewCgroupQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewCgroupQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: @@ -1341,10 +739,9 @@ func BenchmarkHeavyJob(b *testing.B) { } func BenchmarkHeavyJobWithWatchCPUUsage(b *testing.B) { - var ( - qryer, _ = queryer.NewCgroupQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewCgroupQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: @@ -1356,10 +753,9 @@ func BenchmarkHeavyJobWithWatchCPUUsage(b *testing.B) { } func BenchmarkHeavyJobWithWatchMemUsage(b *testing.B) { - var ( - qryer, _ = queryer.NewCgroupQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewCgroupQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: @@ -1370,30 +766,6 @@ func BenchmarkHeavyJobWithWatchMemUsage(b *testing.B) { } } -func fibAsync(n int) int64 { - if n <= 1 { - return int64(n) - } - - var ( - v int64 - m sync.Mutex - wg sync.WaitGroup - ) - - wg.Add(1) - go func() { - defer wg.Done() - - m.Lock() - defer m.Unlock() - v = fibAsync(n-1) + fibAsync(n-2) - }() - wg.Wait() - - return v -} - func BenchmarkLightAsyncJob(b *testing.B) { for i := 0; i < b.N; i++ { fibAsync(10) @@ -1401,10 +773,9 @@ func BenchmarkLightAsyncJob(b *testing.B) { } func BenchmarkLightAsyncJobWithWatchGoroutineCount(b *testing.B) { - var ( - qryer, _ = queryer.NewRuntimeQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewRuntimeQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: @@ -1422,10 +793,9 @@ func BenchmarkHeavyAsyncJob(b *testing.B) { } func BenchmarkHeavyAsyncJobWithWatchGoroutineCount(b *testing.B) { - var ( - qryer, _ = queryer.NewRuntimeQueryer() - ticker = time.NewTicker(defaultWatchInterval) - ) + qryer, _ := queryer.NewRuntimeQueryer() + ticker := time.NewTicker(defaultWatchInterval) + defer ticker.Stop() for i := 0; i < b.N; i++ { select { case <-ticker.C: diff --git a/autopprof_unsupported.go b/autopprof_unsupported.go index 57d866e..2109ddb 100644 --- a/autopprof_unsupported.go +++ b/autopprof_unsupported.go @@ -10,3 +10,8 @@ func Start(opt Option) error { // Stop does not do anything on unsupported platforms. func Stop() {} + +// Register does not do anything on unsupported platforms. +func Register(m Metric) error { + return ErrUnsupportedPlatform +} diff --git a/autopprof_unsupported_test.go b/autopprof_unsupported_test.go index 61407d6..4184990 100644 --- a/autopprof_unsupported_test.go +++ b/autopprof_unsupported_test.go @@ -28,3 +28,13 @@ func TestStart(t *testing.T) { }) } } + +func TestRegister_unsupportedPlatform(t *testing.T) { + m := NewMetric("x", 1, 0, + func() (float64, error) { return 0, nil }, + func(float64) (CollectResult, error) { return CollectResult{}, nil }, + ) + if err := Register(m); !errors.Is(err, ErrUnsupportedPlatform) { + t.Errorf("Register() = %v, want %v", err, ErrUnsupportedPlatform) + } +} diff --git a/error.go b/error.go index 718255e..6f67a11 100644 --- a/error.go +++ b/error.go @@ -1,21 +1,27 @@ package autopprof -import "fmt" +import "errors" -// Errors. var ( - ErrUnsupportedPlatform = fmt.Errorf( + ErrUnsupportedPlatform = errors.New( "autopprof: unsupported platform (only Linux is supported)", ) - ErrInvalidCPUThreshold = fmt.Errorf( + ErrInvalidCPUThreshold = errors.New( "autopprof: cpu threshold value must be between 0 and 1", ) - ErrInvalidMemThreshold = fmt.Errorf( + ErrInvalidMemThreshold = errors.New( "autopprof: memory threshold value must be between 0 and 1", ) - ErrInvalidGoroutineThreshold = fmt.Errorf( + ErrInvalidGoroutineThreshold = errors.New( "autopprof: goroutine threshold value must be greater than to 0", ) - ErrNilReporter = fmt.Errorf("autopprof: Reporter can't be nil") - ErrDisableAllProfiling = fmt.Errorf("autopprof: all profiling is disabled") + ErrNilReporter = errors.New("autopprof: Reporter can't be nil") + ErrDisableAllProfiling = errors.New("autopprof: all profiling is disabled") + + ErrInvalidMetric = errors.New( + "autopprof: metric is invalid (nil, empty name, negative threshold/interval, or nil query/collect)", + ) + ErrNotStarted = errors.New( + "autopprof: Start() must be called before Register", + ) ) diff --git a/examples/example.go b/examples/example.go index 6963822..48224a5 100644 --- a/examples/example.go +++ b/examples/example.go @@ -4,25 +4,61 @@ package main import ( + "bytes" "errors" "fmt" "log" + "runtime" + "runtime/pprof" + "time" - "github.com/daangn/autopprof" - "github.com/daangn/autopprof/report" + "github.com/daangn/autopprof/v2" + "github.com/daangn/autopprof/v2/report" ) type mm struct { m map[int64]string } +// queue is a toy struct that also implements autopprof.Metric — +// showing how a user's own type can plug directly into the watcher. +type queue struct { + name string + threshold float64 +} + +func (q *queue) Name() string { return q.name } +func (q *queue) Threshold() float64 { return q.threshold } +func (q *queue) Interval() time.Duration { return 3 * time.Second } + +// Query returns the current queue depth. Normally this reads from a +// real backend; here we just return a constant so the threshold is +// always breached and the example always emits one report. +func (q *queue) Query() (float64, error) { return 42, nil } + +// Collect produces the payload (a plaintext snapshot in this example) +// along with the filename and comment autopprof will forward via the +// Reporter. Returning (nil, nil) for Reader would skip the Reporter +// call, which is useful for side-effect-only hooks. +func (q *queue) Collect(value float64) (autopprof.CollectResult, error) { + body := fmt.Sprintf("queue=%s depth=%v threshold=%v", q.name, value, q.threshold) + return autopprof.CollectResult{ + Reader: bytes.NewReader([]byte(body)), + Filename: fmt.Sprintf("%s.snapshot.txt", q.name), + Comment: fmt.Sprintf(":warning:[%s] queue depth %.0f exceeded %.0f", q.name, value, q.threshold), + }, nil +} + func main() { + // (A) Start with the built-in CPU / Mem watchers. Option.App is + // the single source of truth for the "" segment in built-in + // filenames. err := autopprof.Start(autopprof.Option{ + App: "YOUR_APP_NAME", CPUThreshold: 0.8, // Default: 0.75. MemThreshold: 0.8, // Default: 0.75. Reporter: report.NewSlackReporter( &report.SlackReporterOption{ - App: "YOUR_APP_NAME", Token: "YOUR_TOKEN_HERE", ChannelID: "REPORT_CHANNEL_ID", }, @@ -36,6 +72,38 @@ func main() { } defer autopprof.Stop() + // (B) Register a user Metric implemented on a domain struct. + // Perfect for metrics that live inside a lifecycle that starts + // *after* autopprof.Start, e.g. a queue or connection pool whose + // handle isn't yet available at Start time. + q := &queue{name: "ingest", threshold: 10} + if err := autopprof.Register(q); err != nil { + log.Println("Register queue metric:", err) + } + + // (C) Ad-hoc Metric via NewMetric — no custom struct needed. + // Watches the process's goroutine count and dumps a full + // goroutine stack trace when it exceeds the threshold. + _ = autopprof.Register(autopprof.NewMetric( + "goroutine_blocked", + 100, + 5*time.Second, + func() (float64, error) { + return float64(runtime.NumGoroutine()), nil + }, + func(v float64) (autopprof.CollectResult, error) { + var buf bytes.Buffer + if err := pprof.Lookup("goroutine").WriteTo(&buf, 1); err != nil { + return autopprof.CollectResult{}, err + } + return autopprof.CollectResult{ + Reader: bytes.NewReader(buf.Bytes()), + Filename: fmt.Sprintf("goroutine_blocked_%d.txt", time.Now().Unix()), + Comment: fmt.Sprintf(":rotating_light:[GB] count=%d", int(v)), + }, nil + }, + )) + eatMemory() go func() { diff --git a/examples/go.mod b/examples/go.mod index 25fbb49..6b3455d 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -2,9 +2,9 @@ module github.com/daangn/autopprof/examples go 1.19 -replace github.com/daangn/autopprof => ../ +replace github.com/daangn/autopprof/v2 => ../ -require github.com/daangn/autopprof v0.0.0-00010101000000-000000000000 +require github.com/daangn/autopprof/v2 v2.0.0-00010101000000-000000000000 require ( github.com/cilium/ebpf v0.4.0 // indirect diff --git a/go.mod b/go.mod index 843ebea..a8f91a3 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/daangn/autopprof +module github.com/daangn/autopprof/v2 go 1.19 diff --git a/metric.go b/metric.go new file mode 100644 index 0000000..ae823eb --- /dev/null +++ b/metric.go @@ -0,0 +1,132 @@ +package autopprof + +import ( + "bytes" + "fmt" + "io" + "os" + "time" +) + +const reportTimeLayout = "2006-01-02T150405.MST" + +// CollectResult is the payload Metric.Collect hands to autopprof. +// Reader == nil means "handled internally, skip the Reporter call" +// (useful for side-effect-only hooks). Empty Filename/Comment are +// filled in with autopprof defaults. +type CollectResult struct { + Reader io.Reader + Filename string + Comment string +} + +// Metric is the unified abstraction for every threshold-triggered +// data collection autopprof performs. Built-in CPU/Mem/Goroutine +// watchers are pre-defined implementations; users register additional +// Metrics via Option.Metrics or autopprof.Register. +// +// Thread-safety: autopprof only calls Query and Collect from the +// Metric's own watcher goroutine, so implementations do not need +// internal synchronization. (The ReportAll cascade touches only +// built-ins.) +// +// Name/Threshold/Interval are read once at registration. Interval == 0 +// means "use the global watchInterval (default 5s)". +type Metric interface { + Name() string + Threshold() float64 + Interval() time.Duration + Query() (float64, error) + Collect(value float64) (CollectResult, error) +} + +// NewMetric is a convenience constructor. Nil query/collect surface +// ErrInvalidMetric at call time instead of panicking. +func NewMetric( + name string, + threshold float64, + interval time.Duration, + query func() (float64, error), + collect func(value float64) (CollectResult, error), +) Metric { + if query == nil { + query = func() (float64, error) { return 0, ErrInvalidMetric } + } + if collect == nil { + collect = func(float64) (CollectResult, error) { + return CollectResult{}, ErrInvalidMetric + } + } + return &basicMetric{ + name: name, + threshold: threshold, + interval: interval, + query: query, + collect: collect, + } +} + +type basicMetric struct { + name string + threshold float64 + interval time.Duration + query func() (float64, error) + collect func(value float64) (CollectResult, error) +} + +func (b *basicMetric) Name() string { return b.name } +func (b *basicMetric) Threshold() float64 { return b.threshold } +func (b *basicMetric) Interval() time.Duration { return b.interval } +func (b *basicMetric) Query() (float64, error) { return b.query() } +func (b *basicMetric) Collect(v float64) (CollectResult, error) { return b.collect(v) } + +func validateMetric(m Metric) error { + if m == nil { + return ErrInvalidMetric + } + if m.Name() == "" || m.Threshold() < 0 || m.Interval() < 0 { + return ErrInvalidMetric + } + return nil +} + +func hostnameSafe() string { + h, _ := os.Hostname() + return h +} + +func collectProfile( + app, filenameFmt string, + profile func() ([]byte, error), + comment string, +) (CollectResult, error) { + b, err := profile() + if err != nil { + return CollectResult{}, err + } + now := time.Now().Format(reportTimeLayout) + return CollectResult{ + Reader: bytes.NewReader(b), + Filename: fmt.Sprintf(filenameFmt, app, hostnameSafe(), now), + Comment: comment, + }, nil +} + +var _ io.Reader = (*bytes.Reader)(nil) + +// defaultFilename is used when Collect returns an empty Filename. The +// ".bin" extension signals "opaque bytes" to Reporter implementations +// that don't recognize the metric name. +func defaultFilename(metricName string) string { + return fmt.Sprintf( + "%s.%s.%s.bin", + metricName, hostnameSafe(), time.Now().Format(reportTimeLayout), + ) +} + +func defaultComment(metricName string, value, threshold float64) string { + return fmt.Sprintf( + ":rotating_light:[%s] value=%.2f threshold=%.2f", + metricName, value, threshold, + ) +} diff --git a/metric_cpu.go b/metric_cpu.go new file mode 100644 index 0000000..621b300 --- /dev/null +++ b/metric_cpu.go @@ -0,0 +1,38 @@ +//go:build linux +// +build linux + +package autopprof + +import ( + "fmt" + "time" + + "github.com/daangn/autopprof/v2/queryer" +) + +const ( + MetricNameCPU = "cpu" + + cpuProfileFilenameFmt = "pprof.%s.%s.samples.cpu.%s.pprof" + cpuCommentFmt = ":rotating_light:[CPU] usage (*%.2f%%*) > threshold (*%.2f%%*)" +) + +type cpuMetric struct { + app string + threshold float64 + cg queryer.CgroupsQueryer + p profiler +} + +func (m *cpuMetric) Name() string { return MetricNameCPU } +func (m *cpuMetric) Threshold() float64 { return m.threshold } +func (m *cpuMetric) Interval() time.Duration { return 0 } +func (m *cpuMetric) Query() (float64, error) { return m.cg.CPUUsage() } + +func (m *cpuMetric) Collect(value float64) (CollectResult, error) { + return collectProfile( + m.app, cpuProfileFilenameFmt, + m.p.profileCPU, + fmt.Sprintf(cpuCommentFmt, value*100, m.threshold*100), + ) +} diff --git a/metric_goroutine.go b/metric_goroutine.go new file mode 100644 index 0000000..b11d926 --- /dev/null +++ b/metric_goroutine.go @@ -0,0 +1,43 @@ +//go:build linux +// +build linux + +package autopprof + +import ( + "fmt" + "time" + + "github.com/daangn/autopprof/v2/queryer" +) + +const ( + MetricNameGoroutine = "goroutine" + + goroutineProfileFilenameFmt = "pprof.%s.%s.goroutine.%s.pprof" + goroutineCommentFmt = ":rotating_light:[GOROUTINE] count (*%d*) > threshold (*%d*)" +) + +// goroutineMetric keeps its threshold as int to mirror +// Option.GoroutineThreshold; the int(value) cast in Collect preserves +// the integer-formatted legacy comment. +type goroutineMetric struct { + app string + threshold int + rt queryer.RuntimeQueryer + p profiler +} + +func (m *goroutineMetric) Name() string { return MetricNameGoroutine } +func (m *goroutineMetric) Threshold() float64 { return float64(m.threshold) } +func (m *goroutineMetric) Interval() time.Duration { return 0 } +func (m *goroutineMetric) Query() (float64, error) { + return float64(m.rt.GoroutineCount()), nil +} + +func (m *goroutineMetric) Collect(value float64) (CollectResult, error) { + return collectProfile( + m.app, goroutineProfileFilenameFmt, + m.p.profileGoroutine, + fmt.Sprintf(goroutineCommentFmt, int(value), m.threshold), + ) +} diff --git a/metric_mem.go b/metric_mem.go new file mode 100644 index 0000000..e617e49 --- /dev/null +++ b/metric_mem.go @@ -0,0 +1,38 @@ +//go:build linux +// +build linux + +package autopprof + +import ( + "fmt" + "time" + + "github.com/daangn/autopprof/v2/queryer" +) + +const ( + MetricNameMem = "mem" + + heapProfileFilenameFmt = "pprof.%s.%s.alloc_objects.alloc_space.inuse_objects.inuse_space.%s.pprof" + memCommentFmt = ":rotating_light:[MEM] usage (*%.2f%%*) > threshold (*%.2f%%*)" +) + +type memMetric struct { + app string + threshold float64 + cg queryer.CgroupsQueryer + p profiler +} + +func (m *memMetric) Name() string { return MetricNameMem } +func (m *memMetric) Threshold() float64 { return m.threshold } +func (m *memMetric) Interval() time.Duration { return 0 } +func (m *memMetric) Query() (float64, error) { return m.cg.MemUsage() } + +func (m *memMetric) Collect(value float64) (CollectResult, error) { + return collectProfile( + m.app, heapProfileFilenameFmt, + m.p.profileHeap, + fmt.Sprintf(memCommentFmt, value*100, m.threshold*100), + ) +} diff --git a/option.go b/option.go index e4b161f..af1c2cd 100644 --- a/option.go +++ b/option.go @@ -3,62 +3,66 @@ package autopprof import ( "time" - "github.com/daangn/autopprof/report" + "github.com/daangn/autopprof/v2/report" ) const ( + defaultApp = "autopprof" defaultCPUThreshold = 0.75 defaultMemThreshold = 0.75 defaultGoroutineThreshold = 50000 defaultWatchInterval = 5 * time.Second defaultCPUProfilingDuration = 10 * time.Second - defaultMinConsecutiveOverThreshold = 12 // min 1 minute. (12*5s) + defaultMinConsecutiveOverThreshold = 12 // 12 * 5s == 1 minute ) -// Option is the configuration for the autopprof. +// Option is the configuration for autopprof. type Option struct { - // DisableCPUProf disables the CPU profiling. + // DisableCPUProf disables the CPU profiling. Disabled built-ins + // are also skipped by the cascade that fires when any other + // built-in breaches its threshold. DisableCPUProf bool - // DisableMemProf disables the memory profiling. + // DisableMemProf disables the memory profiling. Disabled built-ins + // are also skipped by the cascade that fires when any other + // built-in breaches its threshold. DisableMemProf bool - // DisableGoroutineProf disables the goroutine profiling. + // DisableGoroutineProf disables the goroutine profiling. Disabled + // built-ins are also skipped by the cascade that fires when any + // other built-in breaches its threshold. DisableGoroutineProf bool - // CPUThreshold is the cpu usage threshold (between 0 and 1) - // to trigger the cpu profiling. - // Autopprof will start the cpu profiling when the cpu usage - // is higher than this threshold. + // CPUThreshold is the cpu usage threshold (between 0 and 1) to + // trigger the cpu profiling. Autopprof starts cpu profiling when + // the cpu usage is higher than this threshold. CPUThreshold float64 - // MemThreshold is the memory usage threshold (between 0 and 1) - // to trigger the heap profiling. - // Autopprof will start the heap profiling when the memory usage - // is higher than this threshold. + // MemThreshold is the memory usage threshold (between 0 and 1) to + // trigger the heap profiling. Autopprof starts heap profiling + // when the memory usage is higher than this threshold. MemThreshold float64 - // GoroutineThreshold is the goroutine count threshold to trigger the goroutine profiling. - // to trigger the goroutine profiling. - // Autopprof will start the goroutine profiling when the goroutine count - // is higher than this threshold. + // GoroutineThreshold is the goroutine count threshold to trigger + // the goroutine profiling. Autopprof starts goroutine profiling + // when the goroutine count is higher than this threshold. GoroutineThreshold int - // deprecated: use reportAll instead. - // ReportBoth sets whether to trigger reports for both CPU and memory when either threshold is exceeded. - // If some profiling is disabled, exclude it. - ReportBoth bool + // Reporter is the reporter to send the profiling report. Must + // implement the report.Reporter interface. + Reporter report.Reporter - // ReportAll sets whether to trigger reports for all profiling types when any threshold is exceeded. - // If some profiling is disabled, exclude it. - ReportAll bool + // App is embedded in built-in CPU/Mem/Goroutine filenames as the + // "" segment. Defaults to "autopprof" when left empty. + App string - // Reporter is the reporter to send the profiling report implementing - // the report.Reporter interface. - Reporter report.Reporter + // Metrics are user-defined Metrics registered at Start. Additional + // metrics can be added later via autopprof.Register. + Metrics []Metric } -// NOTE(mingrammer): testing the validate() is done in autopprof_test.go. func (o Option) validate() error { - if o.DisableCPUProf && o.DisableMemProf && o.DisableGoroutineProf { + // Allow disabling every built-in as long as at least one custom + // Metric is registered. + if o.DisableCPUProf && o.DisableMemProf && o.DisableGoroutineProf && len(o.Metrics) == 0 { return ErrDisableAllProfiling } if o.CPUThreshold < 0 || o.CPUThreshold > 1 { @@ -73,5 +77,11 @@ func (o Option) validate() error { if o.Reporter == nil { return ErrNilReporter } + + for _, m := range o.Metrics { + if err := validateMetric(m); err != nil { + return err + } + } return nil } diff --git a/profile.go b/profile.go index 6a3b25e..1171dc1 100644 --- a/profile.go +++ b/profile.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "runtime/pprof" + "sync" "time" ) @@ -23,6 +24,12 @@ type defaultProfiler struct { // the enough cpu profiling data. // Default: 10s. cpuProfilingDuration time.Duration + + // cpuMu serializes profileCPU calls. pprof.StartCPUProfile is a + // process-wide singleton — concurrent invocations would make the + // second one fail immediately. With ReportAll a cascade path can + // land on CPU at the same tick as its own watcher, so we gate it. + cpuMu sync.Mutex } func newDefaultProfiler(duration time.Duration) *defaultProfiler { @@ -32,6 +39,9 @@ func newDefaultProfiler(duration time.Duration) *defaultProfiler { } func (p *defaultProfiler) profileCPU() ([]byte, error) { + p.cpuMu.Lock() + defer p.cpuMu.Unlock() + var ( buf bytes.Buffer w = bufio.NewWriter(&buf) diff --git a/queryer/cgroupv1.go b/queryer/cgroupv1.go index 97cb007..65b4a8c 100644 --- a/queryer/cgroupv1.go +++ b/queryer/cgroupv1.go @@ -30,6 +30,7 @@ type cgroupV1 struct { cpuQuota float64 + // q is the CPU-usage snapshot queue. q cpuUsageSnapshotQueuer } @@ -50,6 +51,7 @@ func (c *cgroupV1) CPUUsage() (float64, error) { if err != nil { return 0, err } + c.snapshotCPUUsage(stat.CPU.Usage.Total) // In nanoseconds. // Calculate the usage only if there are enough snapshots. diff --git a/queryer/cgroupv2.go b/queryer/cgroupv2.go index a44b95f..9e6d277 100644 --- a/queryer/cgroupv2.go +++ b/queryer/cgroupv2.go @@ -34,6 +34,7 @@ type cgroupV2 struct { cpuQuota float64 + // q is the CPU-usage snapshot queue. q cpuUsageSnapshotQueuer } @@ -54,6 +55,7 @@ func (c *cgroupV2) CPUUsage() (float64, error) { if err != nil { return 0, err } + c.snapshotCPUUsage(stat.CPU.UsageUsec) // In microseconds. // Calculate the usage only if there are enough snapshots. diff --git a/queryer/queue.go b/queryer/queue.go index c62da7d..a3f348f 100644 --- a/queryer/queue.go +++ b/queryer/queue.go @@ -1,6 +1,9 @@ package queryer -import "time" +import ( + "sync" + "time" +) // cpuUsageSnapshotQueue is a circular queue of cpuUsageSnapshot. // It doesn't implement dequeue() method because it's not needed. @@ -16,11 +19,6 @@ type cpuUsageSnapshotQueuer interface { // IsFull returns true if the queue is full. isFull() bool - - // The maximum number of elements that the queue can hold. - cap() int - // The number of elements that the queue holds. - len() int } type cpuUsageSnapshot struct { @@ -30,7 +28,10 @@ type cpuUsageSnapshot struct { timestamp time.Time } +// cpuUsageSnapshotQueue is goroutine-safe: every exported method takes +// the internal mutex. Callers don't need to serialize access. type cpuUsageSnapshotQueue struct { + mu sync.Mutex list []*cpuUsageSnapshot headIdx int tailIdx int @@ -43,42 +44,44 @@ func newCPUUsageSnapshotQueue(cap int) *cpuUsageSnapshotQueue { } func (q *cpuUsageSnapshotQueue) enqueue(cs *cpuUsageSnapshot) { - if q.len() == q.cap() { + q.mu.Lock() + defer q.mu.Unlock() + c := cap(q.list) + if len(q.list) == c { q.list[q.tailIdx] = cs - q.tailIdx = (q.tailIdx + 1) % q.cap() - q.headIdx = (q.headIdx + 1) % q.cap() + q.tailIdx = (q.tailIdx + 1) % c + q.headIdx = (q.headIdx + 1) % c } else { q.list = append(q.list, cs) - q.tailIdx = (q.tailIdx + 1) % q.cap() + q.tailIdx = (q.tailIdx + 1) % c } } func (q *cpuUsageSnapshotQueue) head() *cpuUsageSnapshot { - if q.len() == 0 { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.list) == 0 { return nil } return q.list[q.headIdx] } func (q *cpuUsageSnapshotQueue) tail() *cpuUsageSnapshot { - if q.len() == 0 { + q.mu.Lock() + defer q.mu.Unlock() + if len(q.list) == 0 { return nil } + c := cap(q.list) baseIdx := q.tailIdx if baseIdx == 0 { - baseIdx = q.cap() + baseIdx = c } - return q.list[(baseIdx-1)%q.cap()] + return q.list[(baseIdx-1)%c] } func (q *cpuUsageSnapshotQueue) isFull() bool { - return q.len() == q.cap() -} - -func (q *cpuUsageSnapshotQueue) cap() int { - return cap(q.list) -} - -func (q *cpuUsageSnapshotQueue) len() int { - return len(q.list) + q.mu.Lock() + defer q.mu.Unlock() + return len(q.list) == cap(q.list) } diff --git a/queryer/queue_test.go b/queryer/queue_test.go index df34312..8714484 100644 --- a/queryer/queue_test.go +++ b/queryer/queue_test.go @@ -246,8 +246,8 @@ func TestCPUUsageSnapshotQueue_cap(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { q := tc.newQ() - if got := q.cap(); got != tc.want { - t.Errorf("cap() = %v, want %v", got, tc.want) + if got := cap(q.list); got != tc.want { + t.Errorf("cap(q.list) = %v, want %v", got, tc.want) } }) } @@ -292,8 +292,8 @@ func TestCPUUsageSnapshotQueue_len(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { q := tc.newQ() - if got := q.len(); got != tc.want { - t.Errorf("len() = %v, want %v", got, tc.want) + if got := len(q.list); got != tc.want { + t.Errorf("len(q.list) = %v, want %v", got, tc.want) } }) } diff --git a/report/report.go b/report/report.go index ee7513e..451ad9c 100644 --- a/report/report.go +++ b/report/report.go @@ -7,45 +7,36 @@ import ( //go:generate mockgen -source=report.go -destination=report_mock.go -package=report -const ( - // CPUProfileFilenameFmt is the filename format for the CPU profile. - // pprof...samples.cpu..pprof. - CPUProfileFilenameFmt = "pprof.%s.%s.samples.cpu.%s.pprof" - - // HeapProfileFilenameFmt is the filename format for the heap profile. - // pprof...alloc_objects.alloc_space.inuse_objects.inuse_space..pprof. - HeapProfileFilenameFmt = "pprof.%s.%s.alloc_objects.alloc_space.inuse_objects.inuse_space.%s.pprof" - - // GoroutineProfileFilenameFmt is the filename format for the goroutine profile. - // pprof...goroutine..pprof. - GoroutineProfileFilenameFmt = "pprof.%s.%s.goroutine.%s.pprof" -) - -// Reporter is responsible for reporting the profiling report to the destination. -type Reporter interface { - // ReportCPUProfile sends the CPU profiling data to the specific destination. - ReportCPUProfile(ctx context.Context, r io.Reader, ci CPUInfo) error - - // ReportHeapProfile sends the heap profiling data to the specific destination. - ReportHeapProfile(ctx context.Context, r io.Reader, mi MemInfo) error - - // ReportGoroutineProfile sends the goroutine profiling data to the specific destination. - ReportGoroutineProfile(ctx context.Context, r io.Reader, gi GoroutineInfo) error +// ReportInfo carries structured metadata about a report so Reporter +// implementations can route or re-format without parsing filenames +// or comments. +type ReportInfo struct { + // MetricName is "cpu", "mem", "goroutine", or the user-supplied + // name of a Metric registered via autopprof.Register / + // Option.Metrics. + MetricName string + + // Filename is what autopprof chose for the upload. If the Metric + // returned a non-empty Filename via CollectResult it is used as-is; + // otherwise autopprof fills in a default. + Filename string + + // Comment is the human-readable message associated with the report. + // Same filling rule as Filename. + Comment string + + // Value is the latest Query() value that triggered this report. + Value float64 + + // Threshold is the Metric's configured threshold. + Threshold float64 } -// CPUInfo is the CPU usage information. -type CPUInfo struct { - ThresholdPercentage float64 - UsagePercentage float64 -} - -// MemInfo is the memory usage information. -type MemInfo struct { - ThresholdPercentage float64 - UsagePercentage float64 -} - -type GoroutineInfo struct { - ThresholdCount int - Count int +// Reporter sends a single profile/payload to its destination. Every +// Metric (built-in CPU/Mem/Goroutine or user-defined) routes through +// this one method. The caller (autopprof) provides a preformatted +// filename/comment via Metric.Collect, plus structured metadata in +// ReportInfo so the Reporter can decide how to present the message. +type Reporter interface { + Report(ctx context.Context, r io.Reader, info ReportInfo) error } diff --git a/report/report_mock.go b/report/report_mock.go index c688176..bbc4d77 100644 --- a/report/report_mock.go +++ b/report/report_mock.go @@ -35,44 +35,16 @@ func (m *MockReporter) EXPECT() *MockReporterMockRecorder { return m.recorder } -// ReportCPUProfile mocks base method. -func (m *MockReporter) ReportCPUProfile(ctx context.Context, r io.Reader, ci CPUInfo) error { +// Report mocks base method. +func (m *MockReporter) Report(ctx context.Context, r io.Reader, info ReportInfo) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportCPUProfile", ctx, r, ci) + ret := m.ctrl.Call(m, "Report", ctx, r, info) ret0, _ := ret[0].(error) return ret0 } -// ReportCPUProfile indicates an expected call of ReportCPUProfile. -func (mr *MockReporterMockRecorder) ReportCPUProfile(ctx, r, ci interface{}) *gomock.Call { +// Report indicates an expected call of Report. +func (mr *MockReporterMockRecorder) Report(ctx, r, info interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportCPUProfile", reflect.TypeOf((*MockReporter)(nil).ReportCPUProfile), ctx, r, ci) -} - -// ReportGoroutineProfile mocks base method. -func (m *MockReporter) ReportGoroutineProfile(ctx context.Context, r io.Reader, gi GoroutineInfo) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportGoroutineProfile", ctx, r, gi) - ret0, _ := ret[0].(error) - return ret0 -} - -// ReportGoroutineProfile indicates an expected call of ReportGoroutineProfile. -func (mr *MockReporterMockRecorder) ReportGoroutineProfile(ctx, r, gi interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportGoroutineProfile", reflect.TypeOf((*MockReporter)(nil).ReportGoroutineProfile), ctx, r, gi) -} - -// ReportHeapProfile mocks base method. -func (m *MockReporter) ReportHeapProfile(ctx context.Context, r io.Reader, mi MemInfo) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ReportHeapProfile", ctx, r, mi) - ret0, _ := ret[0].(error) - return ret0 -} - -// ReportHeapProfile indicates an expected call of ReportHeapProfile. -func (mr *MockReporterMockRecorder) ReportHeapProfile(ctx, r, mi interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReportHeapProfile", reflect.TypeOf((*MockReporter)(nil).ReportHeapProfile), ctx, r, mi) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockReporter)(nil).Report), ctx, r, info) } diff --git a/report/slack.go b/report/slack.go index c7fd3c8..f9ed1cf 100644 --- a/report/slack.go +++ b/report/slack.go @@ -5,26 +5,13 @@ import ( "context" "fmt" "io" - "os" - "time" "github.com/slack-go/slack" ) -const ( - reportTimeLayout = "2006-01-02T150405.MST" - - cpuCommentFmt = ":rotating_light:[CPU] usage (*%.2f%%*) > threshold (*%.2f%%*)" - memCommentFmt = ":rotating_light:[MEM] usage (*%.2f%%*) > threshold (*%.2f%%*)" - goroutineCommentFmt = ":rotating_light:[GOROUTINE] count (*%d*) > threshold (*%d*)" -) - // SlackReporter is the reporter to send the profiling report to the // specific Slack channel. type SlackReporter struct { - app string - channel string - channelID string client *slack.Client @@ -32,117 +19,63 @@ type SlackReporter struct { // SlackReporterOption is the option for the Slack reporter. type SlackReporterOption struct { - App string - Token string - // Deprecated: Use ChannelID instead. Reporting with a channel name is no longer supported because the latest Slack API for file uploads requires a channel ID instead of a channel name. - // For more details about the Slack API, refer to: https://api.slack.com/methods/files.completeUploadExternal - // - // For details about the file upload process: https://api.slack.com/messaging/files#uploading_files - Channel string + Token string ChannelID string } // NewSlackReporter returns the new SlackReporter. func NewSlackReporter(opt *SlackReporterOption) *SlackReporter { return &SlackReporter{ - app: opt.App, - channel: opt.Channel, channelID: opt.ChannelID, client: slack.New(opt.Token), } } -// ReportCPUProfile sends the CPU profiling data to the Slack. -func (s *SlackReporter) ReportCPUProfile( - ctx context.Context, r io.Reader, ci CPUInfo, -) error { - hostname, _ := os.Hostname() // Don't care about this error. - var ( - now = time.Now().Format(reportTimeLayout) - filename = fmt.Sprintf(CPUProfileFilenameFmt, s.app, hostname, now) - comment = fmt.Sprintf(cpuCommentFmt, ci.UsagePercentage, ci.ThresholdPercentage) - ) - if err := s.reportProfile(ctx, r, filename, comment); err != nil { - return fmt.Errorf("autopprof: failed to upload a file to Slack channel: %w", err) - } - return nil -} - -// ReportHeapProfile sends the heap profiling data to the Slack. -func (s *SlackReporter) ReportHeapProfile( - ctx context.Context, r io.Reader, mi MemInfo, -) error { - hostname, _ := os.Hostname() // Don't care about this error. - var ( - now = time.Now().Format(reportTimeLayout) - filename = fmt.Sprintf(HeapProfileFilenameFmt, s.app, hostname, now) - comment = fmt.Sprintf(memCommentFmt, mi.UsagePercentage, mi.ThresholdPercentage) - ) - if err := s.reportProfile(ctx, r, filename, comment); err != nil { - return fmt.Errorf("autopprof: failed to upload a file to Slack channel: %w", err) - } - return nil -} - -// ReportGoroutineProfile sends the goroutine profiling data to the Slack. -func (s *SlackReporter) ReportGoroutineProfile( - ctx context.Context, r io.Reader, gi GoroutineInfo, +// Report sends the profiling report to Slack. The filename and +// comment are provided by autopprof (either supplied by the Metric's +// Collect or filled with defaults). +func (s *SlackReporter) Report( + ctx context.Context, r io.Reader, info ReportInfo, ) error { - hostname, _ := os.Hostname() // Don't care about this error. - var ( - now = time.Now().Format(reportTimeLayout) - filename = fmt.Sprintf(GoroutineProfileFilenameFmt, s.app, hostname, now) - comment = fmt.Sprintf(goroutineCommentFmt, gi.Count, gi.ThresholdCount) - ) - if err := s.reportProfile(ctx, r, filename, comment); err != nil { + if err := s.reportProfile(ctx, r, info.Filename, info.Comment); err != nil { return fmt.Errorf("autopprof: failed to upload a file to Slack channel: %w", err) } return nil } func (s *SlackReporter) reportProfile(ctx context.Context, r io.Reader, filename, comment string) error { - if s.channelID != "" { - fileSize := 0 - reader := r - if seeker, ok := r.(io.Seeker); ok { - size, err := seeker.Seek(0, io.SeekEnd) - if err != nil { - return fmt.Errorf("failed to determine reader size by seeking: %w", err) - } - fileSize = int(size) - - // Reset the stream's cursor to the beginning. - // If we don't do this, the Slack client will start reading from the end of the stream - // and upload an empty data. - _, err = seeker.Seek(0, io.SeekStart) - if err != nil { - return fmt.Errorf("failed to seek back to start: %w", err) - } - } else { - data, err := io.ReadAll(r) - if err != nil { - return fmt.Errorf("failed to read data: %w", err) - } - fileSize = len(data) - reader = bytes.NewReader(data) + fileSize := 0 + reader := r + if seeker, ok := r.(io.Seeker); ok { + size, err := seeker.Seek(0, io.SeekEnd) + if err != nil { + return fmt.Errorf("failed to determine reader size by seeking: %w", err) } + fileSize = int(size) - _, err := s.client.UploadFileV2Context(ctx, slack.UploadFileV2Parameters{ - Reader: reader, - Filename: filename, - FileSize: fileSize, - Title: filename, - InitialComment: comment, - Channel: s.channelID, - }) - return err + // Reset the stream's cursor to the beginning. + // If we don't do this, the Slack client will start reading from the end of the stream + // and upload an empty data. + _, err = seeker.Seek(0, io.SeekStart) + if err != nil { + return fmt.Errorf("failed to seek back to start: %w", err) + } + } else { + data, err := io.ReadAll(r) + if err != nil { + return fmt.Errorf("failed to read data: %w", err) + } + fileSize = len(data) + reader = bytes.NewReader(data) } - _, err := s.client.UploadFileContext(ctx, slack.FileUploadParameters{ - Reader: r, + + _, err := s.client.UploadFileV2Context(ctx, slack.UploadFileV2Parameters{ + Reader: reader, Filename: filename, + FileSize: fileSize, Title: filename, InitialComment: comment, - Channels: []string{s.channel}, + Channel: s.channelID, }) return err }