Skip to content

Commit

Permalink
feat: proc tuner
Browse files Browse the repository at this point in the history
  • Loading branch information
joway committed Aug 12, 2024
1 parent 3042f73 commit b38d25c
Show file tree
Hide file tree
Showing 4 changed files with 393 additions and 0 deletions.
123 changes: 123 additions & 0 deletions util/ptuner/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ptuner

import (
"fmt"
"runtime/metrics"

Check failure on line 5 in util/ptuner/metrics.go

View workflow job for this annotation

GitHub Actions / build

package runtime/metrics is not in GOROOT (/opt/hostedtoolcache/go/1.15.15/x64/src/runtime/metrics)
)

const (
schedLatencyMetricName = "/sched/latencies:seconds"
cpuTotalMetricName = "/cpu/classes/total:cpu-seconds"
cpuUserMetricName = "/cpu/classes/user:cpu-seconds"
cpuIdleMetricName = "/cpu/classes/idle:cpu-seconds"
cpuGCMetricName = "/cpu/classes/gc/total:cpu-seconds"
)

func newRuntimeAnalyzer() *runtimeAnalyzer {
ra := &runtimeAnalyzer{
lastStat: readRuntimeStat(),
}
return ra
}

type runtimeAnalyzer struct {
lastStat runtimeStat
}

func (r *runtimeAnalyzer) Analyze() (schedLatency, cpuPercent float64) {
stat := readRuntimeStat()
lastStat := r.lastStat
r.lastStat = stat

// sched avg
schedLatency += stat.latencyTotal - lastStat.latencyTotal

// cpu avg
total := stat.cpuTotal - lastStat.cpuTotal
idle := stat.cpuIdle - lastStat.cpuIdle
if total > 0 {
cpuPercent = (total - idle) / total
}
return schedLatency, cpuPercent
}

type runtimeStat struct {
latencyAvg, latencyP50, latencyP90, latencyP99, latencyTotal float64 // seconds
cpuTotal, cpuUser, cpuGC, cpuIdle float64 // seconds
}

func (stat runtimeStat) String() string {
ms := float64(1000)
return fmt.Sprintf(
"latency_avg=%.2fms latency_p50=%.2fms latency_p90=%.2fms latency_p99=%.2fms | "+
"cpu_total=%.2fs cpu_user=%.2fs cpu_gc=%.2fs cpu_idle=%.2fs",
stat.latencyAvg*ms, stat.latencyP50*ms, stat.latencyP90*ms, stat.latencyP99*ms,
stat.cpuTotal, stat.cpuUser, stat.cpuGC, stat.cpuIdle,
)
}

func readRuntimeStat() runtimeStat {
var metricSamples = []metrics.Sample{
{Name: schedLatencyMetricName},
{Name: cpuTotalMetricName},
{Name: cpuUserMetricName},
{Name: cpuGCMetricName},
{Name: cpuIdleMetricName},
}
metrics.Read(metricSamples)

var stat runtimeStat
stat.latencyAvg, stat.latencyP50, stat.latencyP90, stat.latencyP99, _, stat.latencyTotal = calculateSchedLatency(metricSamples[0])
stat.cpuTotal, stat.cpuUser, stat.cpuGC, stat.cpuIdle = calculateCPUSeconds(
metricSamples[1], metricSamples[2], metricSamples[3], metricSamples[4],
)
return stat
}

func calculateCPUSeconds(totalSample, userSample, gcSample, idleSample metrics.Sample) (total, user, gc, idle float64) {
total = totalSample.Value.Float64()
user = userSample.Value.Float64()
gc = gcSample.Value.Float64()
idle = idleSample.Value.Float64()
return
}

func calculateSchedLatency(sample metrics.Sample) (avg, p50, p90, p99, max, total float64) {
var (
histogram = sample.Value.Float64Histogram()
totalCount uint64
latestIdx int
)

// range counts
for idx, count := range histogram.Counts {
if count > 0 {
latestIdx = idx
}
totalCount += count
}
p50Count := totalCount / 2
p90Count := uint64(float64(totalCount) * 0.90)
p99Count := uint64(float64(totalCount) * 0.99)

// range buckets
var ranged uint64
for idx, count := range histogram.Counts {
if count == 0 {
continue
}
ranged += count
latency := histogram.Buckets[idx]
total += latency * float64(count)
if p99 == 0 && ranged >= p99Count {
p99 = latency
} else if p90 == 0 && ranged >= p90Count {
p90 = latency
} else if p50 == 0 && ranged >= p50Count {
p50 = latency
}
}
avg = total / float64(totalCount)
max = histogram.Buckets[latestIdx]
return
}
57 changes: 57 additions & 0 deletions util/ptuner/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ptuner

import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)

// userFunc include cpu and gc work
//
//go:noinline
func userFunc(n int) (ret int) {
if n == 0 {
return 0
}
sum := make([]int, n)
for i := 0; i < n; i++ {
sum[i] = userFunc(i / 2)
}
for _, x := range sum {
ret += x
}
return ret
}

func TestMetrics(t *testing.T) {
old := runtime.GOMAXPROCS(4)
defer runtime.GOMAXPROCS(old)

var stop int32
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
sum := 0
for atomic.LoadInt32(&stop) == 0 {
sum += userFunc(i * 10)
}
t.Logf("goroutine[%d] exit", i)
}(i)
}

ra := newRuntimeAnalyzer()
for i := 0; ; i++ {
time.Sleep(time.Second)
schedLatency, cpuPercent := ra.Analyze()
t.Logf("schedLatency=%.2fms cpuPercent=%.2f%%", schedLatency*1000, cpuPercent*100)

if i == 5 {
atomic.StoreInt32(&stop, 1)
t.Logf("stop background goroutines")
}
}
}
177 changes: 177 additions & 0 deletions util/ptuner/tuner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package ptuner

import (
"fmt"
"log"
"runtime"
"sync/atomic"
"time"
)

const (
defaultTuningFrequency = time.Minute
)

type Option func(t *tuner)

func WithMaxProcs(maxProcs int) Option {
return func(t *tuner) {
t.maxProcs = maxProcs
}
}

func WithMinProcs(minProcs int) Option {
return func(t *tuner) {
t.minProcs = minProcs
}
}

func WithTuningFrequency(duration time.Duration) Option {
return func(t *tuner) {
t.tuningFrequency = duration
}
}

func WithTuningLimit(limit int) Option {
return func(t *tuner) {
t.tuningLimit = limit
}
}

var tuningOnce int32

func Tuning(opts ...Option) error {
if atomic.AddInt32(&tuningOnce, 1) > 1 {
return fmt.Errorf("you can only tuning once")
}
t := new(tuner)
t.tuningFrequency = defaultTuningFrequency
t.oriProcs = runtime.GOMAXPROCS(0)
t.curProcs = t.oriProcs
t.minProcs = t.oriProcs
if t.minProcs <= 0 {
t.minProcs = 1
}
t.maxProcs = t.oriProcs * 2
for _, opt := range opts {
opt(t)
}
if t.tuningFrequency < time.Second {
return fmt.Errorf("tuningFrequency should >= 1s")
}
t.schedPerf = make([]float64, t.maxProcs+1)
t.cpuPerf = make([]float64, t.maxProcs+1)
t.ra = newRuntimeAnalyzer()
go t.tuning()
return nil
}

type tuner struct {
oriProcs int // original GOMAXPROCS before turning
curProcs int // current GOMAXPROCS
minProcs int // as same as oriProcs by default
maxProcs int // as same as oriProcs*2 by default
lastProcs int // last GOMAXPROCS before change
tuningFrequency time.Duration
tuningLimit int
// runtime data
ra *runtimeAnalyzer
schedPerf []float64
cpuPerf []float64
}

func (t *tuner) tuning() {
log.Printf("PTuning: start tuning with min_procs=%d max_procs=%d", t.minProcs, t.maxProcs)

ticker := time.NewTicker(t.tuningFrequency)
defer ticker.Stop()
tuned := 0
loop := 0
reportInterval := int(10 * time.Minute / t.tuningFrequency)
for range ticker.C {
loop++
if t.tuningLimit != 0 && tuned >= t.tuningLimit {
log.Printf("PTuning: hit tunning limit[%d], exit", t.tuningLimit)
return
}
if loop%reportInterval == 0 {
t.report()
}

newProcs := t.adjustProcs()
if t.curProcs != newProcs {
log.Printf("PTuning: change GOMAXPROCS from %d to %d", t.curProcs, newProcs)
runtime.GOMAXPROCS(newProcs)
t.lastProcs = t.curProcs
t.curProcs = newProcs
tuned++
}
}
}

func (t *tuner) adjustProcs() int {
// save current perf data
schedLatency, cpuPercent := t.ra.Analyze()
t.schedPerf[t.curProcs] = schedLatency
t.cpuPerf[t.curProcs] = cpuPercent
log.Printf("PTuning: runtime analyze: sched_latency=%.6fms, cpu_percent=%.2f%%", schedLatency*1000, cpuPercent*100)

if t.lastProcs > 0 && t.lastProcs != t.curProcs {
// evaluate the turning effect
if cpuPercent-t.cpuPerf[t.lastProcs] >= 0.05 {
// if tuning cause too many cpu cost, so fallback to last procs
return t.lastProcs
}
}

// in the beginning, next pnumber always have the best performance data
if t.curProcs < t.maxProcs && schedLatency > t.schedPerf[t.curProcs+1] {
return t.curProcs + 1
}
// if prev pnumber have better performance, change to prev pnumber
if t.curProcs > t.minProcs && schedLatency > t.schedPerf[t.curProcs-1] {
return t.curProcs - 1
}
// if current pnumber is the minimum latency choice, check if there is the best choice
bestP := t.bestSchedProc()
// even we can guess the best pnumber, we still need to change it slowly
if bestP > t.curProcs && t.curProcs < t.maxProcs {
return t.curProcs + 1
}
if bestP < t.curProcs && t.curProcs > t.minProcs {
return t.curProcs - 1
}
return t.curProcs
}

func (t *tuner) bestSchedProc() int {
bestP := t.curProcs
bestLatency := t.schedPerf[t.curProcs]
for pn := t.minProcs; pn <= t.maxProcs; pn++ {
if t.schedPerf[pn] < bestLatency {
bestP = pn
bestLatency = t.schedPerf[pn]
}
}
return bestP
}

func (t *tuner) bestCPUProc() int {
bestP := t.curProcs
bestCPU := t.cpuPerf[t.curProcs]
for pn := t.minProcs; pn <= t.maxProcs; pn++ {
if t.cpuPerf[pn] < bestCPU {
bestP = pn
bestCPU = t.cpuPerf[pn]
}
}
return bestP
}

func (t *tuner) report() {
for pn := t.minProcs; pn <= t.maxProcs; pn++ {
log.Printf("PTuning: reporting pnumber=%d sched_latency=%.6fms, cpu_percent=%.2f%%",
pn, t.schedPerf[pn]*1000, t.cpuPerf[pn]*100,
)
}
}
36 changes: 36 additions & 0 deletions util/ptuner/tuner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package ptuner

import (
"runtime"
"sync/atomic"
"testing"
"time"
)

func TestTuner(t *testing.T) {
old := runtime.GOMAXPROCS(2)
defer runtime.GOMAXPROCS(old)

err := Tuning(
WithMaxProcs(5),
//WithTuningLimit(3),
WithTuningFrequency(time.Second),
)
if err != nil {
t.FailNow()
}

var stop int32
for i := 0; i < 10; i++ {
sum := 0
go func(id int) {
for atomic.LoadInt32(&stop) == 0 {
sum += userFunc(id*10 + 1)
}
}(i)
}

time.Sleep(time.Second * 20)
atomic.StoreInt32(&stop, 1)
time.Sleep(time.Second * 20)
}

0 comments on commit b38d25c

Please sign in to comment.