diff --git a/util/ptuner/metrics.go b/util/ptuner/metrics.go new file mode 100644 index 00000000..e59ceb75 --- /dev/null +++ b/util/ptuner/metrics.go @@ -0,0 +1,123 @@ +package ptuner + +import ( + "fmt" + "runtime/metrics" +) + +const ( + schedLatencyMetricName = "/sched/latencies:seconds" + cpuTotalMetricName = "/cpu/classes/total:cpu-seconds" + cpuUserMetricName = "/cpu/classes/user:cpu-seconds" + cpuIdleMetricName = "/cpu/classes/idle:cpu-seconds" + cpuGCMetricName = "/cpu/classes/gc/total:cpu-seconds" +) + +func newRuntimeAnalyzer() *runtimeAnalyzer { + ra := &runtimeAnalyzer{ + lastStat: readRuntimeStat(), + } + return ra +} + +type runtimeAnalyzer struct { + lastStat runtimeStat +} + +func (r *runtimeAnalyzer) Analyze() (schedLatency, cpuPercent float64) { + stat := readRuntimeStat() + lastStat := r.lastStat + r.lastStat = stat + + // sched avg + schedLatency += stat.latencyTotal - lastStat.latencyTotal + + // cpu avg + total := stat.cpuTotal - lastStat.cpuTotal + idle := stat.cpuIdle - lastStat.cpuIdle + if total > 0 { + cpuPercent = (total - idle) / total + } + return schedLatency, cpuPercent +} + +type runtimeStat struct { + latencyAvg, latencyP50, latencyP90, latencyP99, latencyTotal float64 // seconds + cpuTotal, cpuUser, cpuGC, cpuIdle float64 // seconds +} + +func (stat runtimeStat) String() string { + ms := float64(1000) + return fmt.Sprintf( + "latency_avg=%.2fms latency_p50=%.2fms latency_p90=%.2fms latency_p99=%.2fms | "+ + "cpu_total=%.2fs cpu_user=%.2fs cpu_gc=%.2fs cpu_idle=%.2fs", + stat.latencyAvg*ms, stat.latencyP50*ms, stat.latencyP90*ms, stat.latencyP99*ms, + stat.cpuTotal, stat.cpuUser, stat.cpuGC, stat.cpuIdle, + ) +} + +func readRuntimeStat() runtimeStat { + var metricSamples = []metrics.Sample{ + {Name: schedLatencyMetricName}, + {Name: cpuTotalMetricName}, + {Name: cpuUserMetricName}, + {Name: cpuGCMetricName}, + {Name: cpuIdleMetricName}, + } + metrics.Read(metricSamples) + + var stat runtimeStat + stat.latencyAvg, stat.latencyP50, stat.latencyP90, stat.latencyP99, _, stat.latencyTotal = calculateSchedLatency(metricSamples[0]) + stat.cpuTotal, stat.cpuUser, stat.cpuGC, stat.cpuIdle = calculateCPUSeconds( + metricSamples[1], metricSamples[2], metricSamples[3], metricSamples[4], + ) + return stat +} + +func calculateCPUSeconds(totalSample, userSample, gcSample, idleSample metrics.Sample) (total, user, gc, idle float64) { + total = totalSample.Value.Float64() + user = userSample.Value.Float64() + gc = gcSample.Value.Float64() + idle = idleSample.Value.Float64() + return +} + +func calculateSchedLatency(sample metrics.Sample) (avg, p50, p90, p99, max, total float64) { + var ( + histogram = sample.Value.Float64Histogram() + totalCount uint64 + latestIdx int + ) + + // range counts + for idx, count := range histogram.Counts { + if count > 0 { + latestIdx = idx + } + totalCount += count + } + p50Count := totalCount / 2 + p90Count := uint64(float64(totalCount) * 0.90) + p99Count := uint64(float64(totalCount) * 0.99) + + // range buckets + var ranged uint64 + for idx, count := range histogram.Counts { + if count == 0 { + continue + } + ranged += count + latency := histogram.Buckets[idx] + total += latency * float64(count) + if p99 == 0 && ranged >= p99Count { + p99 = latency + } else if p90 == 0 && ranged >= p90Count { + p90 = latency + } else if p50 == 0 && ranged >= p50Count { + p50 = latency + } + } + avg = total / float64(totalCount) + max = histogram.Buckets[latestIdx] + return +} diff --git a/util/ptuner/metrics_test.go b/util/ptuner/metrics_test.go new file mode 100644 index 00000000..f6283bd6 --- /dev/null +++ b/util/ptuner/metrics_test.go @@ -0,0 +1,57 @@ +package ptuner + +import ( + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +// userFunc include cpu and gc work +// +//go:noinline +func userFunc(n int) (ret int) { + if n == 0 { + return 0 + } + sum := make([]int, n) + for i := 0; i < n; i++ { + sum[i] = userFunc(i / 2) + } + for _, x := range sum { + ret += x + } + return ret +} + +func TestMetrics(t *testing.T) { + old := runtime.GOMAXPROCS(4) + defer runtime.GOMAXPROCS(old) + + var stop int32 + var wg sync.WaitGroup + for i := 0; i < 2; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + sum := 0 + for atomic.LoadInt32(&stop) == 0 { + sum += userFunc(i * 10) + } + t.Logf("goroutine[%d] exit", i) + }(i) + } + + ra := newRuntimeAnalyzer() + for i := 0; ; i++ { + time.Sleep(time.Second) + schedLatency, cpuPercent := ra.Analyze() + t.Logf("schedLatency=%.2fms cpuPercent=%.2f%%", schedLatency*1000, cpuPercent*100) + + if i == 5 { + atomic.StoreInt32(&stop, 1) + t.Logf("stop background goroutines") + } + } +} diff --git a/util/ptuner/tuner.go b/util/ptuner/tuner.go new file mode 100644 index 00000000..746315b1 --- /dev/null +++ b/util/ptuner/tuner.go @@ -0,0 +1,177 @@ +package ptuner + +import ( + "fmt" + "log" + "runtime" + "sync/atomic" + "time" +) + +const ( + defaultTuningFrequency = time.Minute +) + +type Option func(t *tuner) + +func WithMaxProcs(maxProcs int) Option { + return func(t *tuner) { + t.maxProcs = maxProcs + } +} + +func WithMinProcs(minProcs int) Option { + return func(t *tuner) { + t.minProcs = minProcs + } +} + +func WithTuningFrequency(duration time.Duration) Option { + return func(t *tuner) { + t.tuningFrequency = duration + } +} + +func WithTuningLimit(limit int) Option { + return func(t *tuner) { + t.tuningLimit = limit + } +} + +var tuningOnce int32 + +func Tuning(opts ...Option) error { + if atomic.AddInt32(&tuningOnce, 1) > 1 { + return fmt.Errorf("you can only tuning once") + } + t := new(tuner) + t.tuningFrequency = defaultTuningFrequency + t.oriProcs = runtime.GOMAXPROCS(0) + t.curProcs = t.oriProcs + t.minProcs = t.oriProcs + if t.minProcs <= 0 { + t.minProcs = 1 + } + t.maxProcs = t.oriProcs * 2 + for _, opt := range opts { + opt(t) + } + if t.tuningFrequency < time.Second { + return fmt.Errorf("tuningFrequency should >= 1s") + } + t.schedPerf = make([]float64, t.maxProcs+1) + t.cpuPerf = make([]float64, t.maxProcs+1) + t.ra = newRuntimeAnalyzer() + go t.tuning() + return nil +} + +type tuner struct { + oriProcs int // original GOMAXPROCS before turning + curProcs int // current GOMAXPROCS + minProcs int // as same as oriProcs by default + maxProcs int // as same as oriProcs*2 by default + lastProcs int // last GOMAXPROCS before change + tuningFrequency time.Duration + tuningLimit int + // runtime data + ra *runtimeAnalyzer + schedPerf []float64 + cpuPerf []float64 +} + +func (t *tuner) tuning() { + log.Printf("PTuning: start tuning with min_procs=%d max_procs=%d", t.minProcs, t.maxProcs) + + ticker := time.NewTicker(t.tuningFrequency) + defer ticker.Stop() + tuned := 0 + loop := 0 + reportInterval := int(10 * time.Minute / t.tuningFrequency) + for range ticker.C { + loop++ + if t.tuningLimit != 0 && tuned >= t.tuningLimit { + log.Printf("PTuning: hit tunning limit[%d], exit", t.tuningLimit) + return + } + if loop%reportInterval == 0 { + t.report() + } + + newProcs := t.adjustProcs() + if t.curProcs != newProcs { + log.Printf("PTuning: change GOMAXPROCS from %d to %d", t.curProcs, newProcs) + runtime.GOMAXPROCS(newProcs) + t.lastProcs = t.curProcs + t.curProcs = newProcs + tuned++ + } + } +} + +func (t *tuner) adjustProcs() int { + // save current perf data + schedLatency, cpuPercent := t.ra.Analyze() + t.schedPerf[t.curProcs] = schedLatency + t.cpuPerf[t.curProcs] = cpuPercent + log.Printf("PTuning: runtime analyze: sched_latency=%.6fms, cpu_percent=%.2f%%", schedLatency*1000, cpuPercent*100) + + if t.lastProcs > 0 && t.lastProcs != t.curProcs { + // evaluate the turning effect + if cpuPercent-t.cpuPerf[t.lastProcs] >= 0.05 { + // if tuning cause too many cpu cost, so fallback to last procs + return t.lastProcs + } + } + + // in the beginning, next pnumber always have the best performance data + if t.curProcs < t.maxProcs && schedLatency > t.schedPerf[t.curProcs+1] { + return t.curProcs + 1 + } + // if prev pnumber have better performance, change to prev pnumber + if t.curProcs > t.minProcs && schedLatency > t.schedPerf[t.curProcs-1] { + return t.curProcs - 1 + } + // if current pnumber is the minimum latency choice, check if there is the best choice + bestP := t.bestSchedProc() + // even we can guess the best pnumber, we still need to change it slowly + if bestP > t.curProcs && t.curProcs < t.maxProcs { + return t.curProcs + 1 + } + if bestP < t.curProcs && t.curProcs > t.minProcs { + return t.curProcs - 1 + } + return t.curProcs +} + +func (t *tuner) bestSchedProc() int { + bestP := t.curProcs + bestLatency := t.schedPerf[t.curProcs] + for pn := t.minProcs; pn <= t.maxProcs; pn++ { + if t.schedPerf[pn] < bestLatency { + bestP = pn + bestLatency = t.schedPerf[pn] + } + } + return bestP +} + +func (t *tuner) bestCPUProc() int { + bestP := t.curProcs + bestCPU := t.cpuPerf[t.curProcs] + for pn := t.minProcs; pn <= t.maxProcs; pn++ { + if t.cpuPerf[pn] < bestCPU { + bestP = pn + bestCPU = t.cpuPerf[pn] + } + } + return bestP +} + +func (t *tuner) report() { + for pn := t.minProcs; pn <= t.maxProcs; pn++ { + log.Printf("PTuning: reporting pnumber=%d sched_latency=%.6fms, cpu_percent=%.2f%%", + pn, t.schedPerf[pn]*1000, t.cpuPerf[pn]*100, + ) + } +} diff --git a/util/ptuner/tuner_test.go b/util/ptuner/tuner_test.go new file mode 100644 index 00000000..9930ddd0 --- /dev/null +++ b/util/ptuner/tuner_test.go @@ -0,0 +1,36 @@ +package ptuner + +import ( + "runtime" + "sync/atomic" + "testing" + "time" +) + +func TestTuner(t *testing.T) { + old := runtime.GOMAXPROCS(2) + defer runtime.GOMAXPROCS(old) + + err := Tuning( + WithMaxProcs(5), + //WithTuningLimit(3), + WithTuningFrequency(time.Second), + ) + if err != nil { + t.FailNow() + } + + var stop int32 + for i := 0; i < 10; i++ { + sum := 0 + go func(id int) { + for atomic.LoadInt32(&stop) == 0 { + sum += userFunc(id*10 + 1) + } + }(i) + } + + time.Sleep(time.Second * 20) + atomic.StoreInt32(&stop, 1) + time.Sleep(time.Second * 20) +}