diff --git a/lang/runtimex/runtime.go b/lang/runtimex/runtime.go index 0fa371ef..7968dc1d 100644 --- a/lang/runtimex/runtime.go +++ b/lang/runtimex/runtime.go @@ -23,6 +23,9 @@ import ( //go:linkname Fastrand runtime.fastrand func Fastrand() uint32 +//go:linkname Nanotime runtime.nanotime +func Nanotime() int64 + func getg() uintptr type puintptr uintptr diff --git a/util/gopool/pool_test.go b/util/gopool/pool_test.go index 6d81bec9..7fc56248 100644 --- a/util/gopool/pool_test.go +++ b/util/gopool/pool_test.go @@ -15,6 +15,7 @@ package gopool import ( + "math" "runtime" "sync" "sync/atomic" @@ -55,6 +56,30 @@ func TestPool(t *testing.T) { } } +func TestCPUBoundTask(t *testing.T) { + runtime.GOMAXPROCS(4) + p := NewPool("test", math.MaxInt32, NewConfig()) + var wg sync.WaitGroup + var tasks = 1000 + var loop = 1000000 + for c := 0; c < tasks; c++ { + wg.Add(1) + n := c + p.Go(func() { + defer wg.Done() + data := make([]byte, 4096) + for i := 0; i < loop; i++ { + data[i%len(data)] = 'a' + byte(i%26) + byte(n) + if i%100000 == 0 { + testFunc() + } + } + _ = data + }) + } + wg.Wait() +} + func TestPoolPanic(t *testing.T) { p := NewPool("test", 100, NewConfig()) p.Go(testPanicFunc) diff --git a/util/gopool/worker.go b/util/gopool/worker.go index 85e048c1..91c4e9ed 100644 --- a/util/gopool/worker.go +++ b/util/gopool/worker.go @@ -20,6 +20,7 @@ import ( "sync" "sync/atomic" + "github.com/bytedance/gopkg/lang/runtimex" "github.com/bytedance/gopkg/util/logger" ) @@ -30,17 +31,24 @@ func init() { } type worker struct { - pool *pool + pool *pool + updateSched bool } func newWorker() interface{} { - return &worker{} + return &worker{ + updateSched: true, + } } func (w *worker) run() { go func() { + g := runtimex.GetG() + m := g.M() + p := m.P() for { var t *task + now := runtimex.Nanotime() w.pool.taskLock.Lock() if w.pool.taskHead != nil { t = w.pool.taskHead @@ -54,6 +62,14 @@ func (w *worker) run() { w.Recycle() return } + + // update sysmon tick + if w.updateSched { + st := *p.Sysmontick() + st.Schedwhen = now + *p.Sysmontick() = st + } + w.pool.taskLock.Unlock() func() { defer func() {