From c5fda0902afdd57a87f01ae382a6cf12b3f754b6 Mon Sep 17 00:00:00 2001 From: wangzhuowei Date: Thu, 20 Jul 2023 15:38:57 +0800 Subject: [PATCH] feat: tpool - add native thread pool --- util/tpool/thread_pool_native.go | 34 ++++++++++++++++++++++++ util/tpool/thread_pool_test.go | 45 +++++++++++++++++++++++++++++--- 2 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 util/tpool/thread_pool_native.go diff --git a/util/tpool/thread_pool_native.go b/util/tpool/thread_pool_native.go new file mode 100644 index 00000000..f0b4fc6c --- /dev/null +++ b/util/tpool/thread_pool_native.go @@ -0,0 +1,34 @@ +package tpool + +import ( + "runtime" + "sync" + "sync/atomic" +) + +func NewNativeThreadPool() ThreadPool { + tp := new(nativeThreadPool) + return tp +} + +type nativeThreadPool struct { + mu sync.Mutex + state int32 // 0: running, -1: closed +} + +func (tp *nativeThreadPool) Size() (size int) { + return runtime.GOMAXPROCS(0) +} + +func (tp *nativeThreadPool) Submit(t task) { + if atomic.LoadInt32(&tp.state) < 0 { // closed + return + } + go func() { + t() + }() +} + +func (tp *nativeThreadPool) Close() { + atomic.CompareAndSwapInt32(&tp.state, 0, -1) +} diff --git a/util/tpool/thread_pool_test.go b/util/tpool/thread_pool_test.go index 76f71a6b..3be7333b 100644 --- a/util/tpool/thread_pool_test.go +++ b/util/tpool/thread_pool_test.go @@ -2,8 +2,10 @@ package tpool import ( "fmt" + "runtime" "sync" "testing" + "time" ) type benchcase struct { @@ -13,8 +15,9 @@ type benchcase struct { func BenchmarkCPUTasks(b *testing.B) { cases := []benchcase{ - {name: "FixedThreadPool-4Threads", threadPool: NewFixedThreadPool(4)}, - {name: "CachedThreadPool-UnlimitedThreads", threadPool: NewCachedThreadPool(WithCachedMaxIdleThreads(32))}, + {name: "NewNativeThreadPool", threadPool: NewFixedThreadPool(runtime.NumCPU())}, + //{name: "FixedThreadPool", threadPool: NewFixedThreadPool(runtime.NumCPU())}, + {name: "CachedThreadPool", threadPool: NewCachedThreadPool(WithCachedMaxIdleThreads(32))}, } defer func() { for _, c := range cases { @@ -24,8 +27,8 @@ func BenchmarkCPUTasks(b *testing.B) { for _, c := range cases { b.Run(fmt.Sprintf("%s", c.name), func(b *testing.B) { - maxCPUTasks := 32 - for tasks := 1; tasks <= maxCPUTasks; tasks *= 2 { + maxTasks := 32 + for tasks := 1; tasks <= maxTasks; tasks *= 2 { b.Run(fmt.Sprintf("Tasks[%d]", tasks), func(b *testing.B) { for i := 0; i < b.N; i++ { var wg sync.WaitGroup @@ -47,3 +50,37 @@ func BenchmarkCPUTasks(b *testing.B) { }) } } + +func BenchmarkIOTasks(b *testing.B) { + cases := []benchcase{ + {name: "NativeThreadPool", threadPool: NewFixedThreadPool(runtime.NumCPU())}, + //{name: "FixedThreadPool", threadPool: NewFixedThreadPool(runtime.NumCPU())}, + {name: "CachedThreadPool", threadPool: NewCachedThreadPool(WithCachedMaxIdleThreads(32))}, + } + defer func() { + for _, c := range cases { + c.threadPool.Close() + } + }() + + for _, c := range cases { + b.Run(fmt.Sprintf("%s", c.name), func(b *testing.B) { + maxTasks := 32 + for tasks := 1; tasks <= maxTasks; tasks *= 2 { + b.Run(fmt.Sprintf("Tasks[%d]", tasks), func(b *testing.B) { + for i := 0; i < b.N; i++ { + var wg sync.WaitGroup + for t := 0; t < tasks; t++ { + wg.Add(1) + c.threadPool.Submit(func() { + defer wg.Done() + time.Sleep(time.Millisecond * 10) + }) + } + wg.Wait() + } + }) + } + }) + } +}