Skip to content

Commit

Permalink
pkg/ratelimit: introduce an executor that can run with a limiter (#8024)
Browse files Browse the repository at this point in the history
ref #7897

pkg/ratelimit: Introduce an executor that can run with a limiter
- async runner can run the task with the limiter, the task will parallel run.
- sync runner used to keep the behavior like before

Signed-off-by: nolouch <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] authored Apr 9, 2024
1 parent ad3387c commit 35e8e95
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 24 deletions.
99 changes: 84 additions & 15 deletions pkg/ratelimit/concurrency_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,33 @@

package ratelimit

import "github.com/tikv/pd/pkg/utils/syncutil"
import (
"context"

type concurrencyLimiter struct {
mu syncutil.RWMutex
"github.com/tikv/pd/pkg/utils/syncutil"
)

// ConcurrencyLimiter is a limiter that limits the number of concurrent tasks.
type ConcurrencyLimiter struct {
mu syncutil.Mutex
current uint64
waiting uint64
limit uint64

// statistic
maxLimit uint64
queue chan *TaskToken
}

func newConcurrencyLimiter(limit uint64) *concurrencyLimiter {
return &concurrencyLimiter{limit: limit}
// NewConcurrencyLimiter creates a new ConcurrencyLimiter.
func NewConcurrencyLimiter(limit uint64) *ConcurrencyLimiter {
return &ConcurrencyLimiter{limit: limit, queue: make(chan *TaskToken, limit)}
}

const unlimit = uint64(0)

func (l *concurrencyLimiter) allow() bool {
// old interface. only used in the ratelimiter package.
func (l *ConcurrencyLimiter) allow() bool {
l.mu.Lock()
defer l.mu.Unlock()

Expand All @@ -45,7 +54,8 @@ func (l *concurrencyLimiter) allow() bool {
return false
}

func (l *concurrencyLimiter) release() {
// old interface. only used in the ratelimiter package.
func (l *ConcurrencyLimiter) release() {
l.mu.Lock()
defer l.mu.Unlock()

Expand All @@ -54,28 +64,32 @@ func (l *concurrencyLimiter) release() {
}
}

func (l *concurrencyLimiter) getLimit() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
// old interface. only used in the ratelimiter package.
func (l *ConcurrencyLimiter) getLimit() uint64 {
l.mu.Lock()
defer l.mu.Unlock()

return l.limit
}

func (l *concurrencyLimiter) setLimit(limit uint64) {
// old interface. only used in the ratelimiter package.
func (l *ConcurrencyLimiter) setLimit(limit uint64) {
l.mu.Lock()
defer l.mu.Unlock()

l.limit = limit
}

func (l *concurrencyLimiter) getCurrent() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
// GetRunningTasksNum returns the number of running tasks.
func (l *ConcurrencyLimiter) GetRunningTasksNum() uint64 {
l.mu.Lock()
defer l.mu.Unlock()

return l.current
}

func (l *concurrencyLimiter) getMaxConcurrency() uint64 {
// old interface. only used in the ratelimiter package.
func (l *ConcurrencyLimiter) getMaxConcurrency() uint64 {
l.mu.Lock()
defer func() {
l.maxLimit = l.current
Expand All @@ -84,3 +98,58 @@ func (l *concurrencyLimiter) getMaxConcurrency() uint64 {

return l.maxLimit
}

// GetWaitingTasksNum returns the number of waiting tasks.
func (l *ConcurrencyLimiter) GetWaitingTasksNum() uint64 {
l.mu.Lock()
defer l.mu.Unlock()
return l.waiting
}

// Acquire acquires a token from the limiter. which will block until a token is available or ctx is done, like Timeout.
func (l *ConcurrencyLimiter) Acquire(ctx context.Context) (*TaskToken, error) {
l.mu.Lock()
if l.current >= l.limit {
l.waiting++
l.mu.Unlock()
// block the waiting task on the caller goroutine
select {
case <-ctx.Done():
l.mu.Lock()
l.waiting--
l.mu.Unlock()
return nil, ctx.Err()
case token := <-l.queue:
l.mu.Lock()
token.released = false
l.current++
l.waiting--
l.mu.Unlock()
return token, nil
}
}
l.current++
token := &TaskToken{limiter: l}
l.mu.Unlock()
return token, nil
}

// TaskToken is a token that must be released after the task is done.
type TaskToken struct {
released bool
limiter *ConcurrencyLimiter
}

// Release releases the token.
func (tt *TaskToken) Release() {
tt.limiter.mu.Lock()
defer tt.limiter.mu.Unlock()
if tt.released {
return
}
tt.released = true
tt.limiter.current--
if len(tt.limiter.queue) < int(tt.limiter.limit) {
tt.limiter.queue <- tt
}
}
82 changes: 78 additions & 4 deletions pkg/ratelimit/concurrency_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@
package ratelimit

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestConcurrencyLimiter(t *testing.T) {
t.Parallel()
re := require.New(t)
cl := newConcurrencyLimiter(10)
cl := NewConcurrencyLimiter(10)
for i := 0; i < 10; i++ {
re.True(cl.allow())
}
Expand All @@ -35,20 +40,89 @@ func TestConcurrencyLimiter(t *testing.T) {
re.Equal(uint64(10), cl.getMaxConcurrency())
cl.setLimit(5)
re.Equal(uint64(5), cl.getLimit())
re.Equal(uint64(10), cl.getCurrent())
re.Equal(uint64(10), cl.GetRunningTasksNum())
cl.release()
re.Equal(uint64(9), cl.getCurrent())
re.Equal(uint64(9), cl.GetRunningTasksNum())
for i := 0; i < 9; i++ {
cl.release()
}
re.Equal(uint64(10), cl.getMaxConcurrency())
for i := 0; i < 5; i++ {
re.True(cl.allow())
}
re.Equal(uint64(5), cl.getCurrent())
re.Equal(uint64(5), cl.GetRunningTasksNum())
for i := 0; i < 5; i++ {
cl.release()
}
re.Equal(uint64(5), cl.getMaxConcurrency())
re.Equal(uint64(0), cl.getMaxConcurrency())
}

func TestConcurrencyLimiter2(t *testing.T) {
limit := uint64(2)
limiter := NewConcurrencyLimiter(limit)

require.Equal(t, uint64(0), limiter.GetRunningTasksNum(), "Expected running tasks to be 0")
require.Equal(t, uint64(0), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 0")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Acquire two tokens
token1, err := limiter.Acquire(ctx)
require.NoError(t, err, "Failed to acquire token")

token2, err := limiter.Acquire(ctx)
require.NoError(t, err, "Failed to acquire token")

require.Equal(t, limit, limiter.GetRunningTasksNum(), "Expected running tasks to be 2")

// Try to acquire third token, it should not be able to acquire immediately due to limit
go func() {
_, err := limiter.Acquire(ctx)
require.NoError(t, err, "Failed to acquire token")
}()

time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run
require.Equal(t, uint64(1), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 1")

// Release a token
token1.Release()
time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run
require.Equal(t, uint64(2), limiter.GetRunningTasksNum(), "Expected running tasks to be 2")
require.Equal(t, uint64(0), limiter.GetWaitingTasksNum(), "Expected waiting tasks to be 0")

// Release the second token
token2.Release()
time.Sleep(100 * time.Millisecond) // Give some time for the goroutine to run
require.Equal(t, uint64(1), limiter.GetRunningTasksNum(), "Expected running tasks to be 1")
}

func TestConcurrencyLimiterAcquire(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

limiter := NewConcurrencyLimiter(20)
sum := int64(0)
start := time.Now()
wg := &sync.WaitGroup{}
wg.Add(100)
for i := 0; i < 100; i++ {
go func(i int) {
defer wg.Done()
token, err := limiter.Acquire(ctx)
if err != nil {
fmt.Printf("Task %d failed to acquire: %v\n", i, err)
return
}
defer token.Release()
// simulate takes some time
time.Sleep(10 * time.Millisecond)
atomic.AddInt64(&sum, 1)
}(i)
}
wg.Wait()
// We should have 20 tasks running concurrently, so it should take at least 50ms to complete
require.Greater(t, time.Since(start).Milliseconds(), int64(50))
require.Equal(t, int64(100), sum)
}
10 changes: 5 additions & 5 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,18 @@ type DimensionConfig struct {

type limiter struct {
mu syncutil.RWMutex
concurrency *concurrencyLimiter
concurrency *ConcurrencyLimiter
rate *RateLimiter
}

func newLimiter() *limiter {
lim := &limiter{
concurrency: newConcurrencyLimiter(0),
concurrency: NewConcurrencyLimiter(0),
}
return lim
}

func (l *limiter) getConcurrencyLimiter() *concurrencyLimiter {
func (l *limiter) getConcurrencyLimiter() *ConcurrencyLimiter {
l.mu.RLock()
defer l.mu.RUnlock()
return l.concurrency
Expand Down Expand Up @@ -81,7 +81,7 @@ func (l *limiter) getQPSLimiterStatus() (limit rate.Limit, burst int) {
func (l *limiter) getConcurrencyLimiterStatus() (limit uint64, current uint64) {
baseLimiter := l.getConcurrencyLimiter()
if baseLimiter != nil {
return baseLimiter.getLimit(), baseLimiter.getCurrent()
return baseLimiter.getLimit(), baseLimiter.GetRunningTasksNum()
}
return 0, 0
}
Expand All @@ -101,7 +101,7 @@ func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus {
}
l.concurrency.setLimit(limit)
} else {
l.concurrency = newConcurrencyLimiter(limit)
l.concurrency = NewConcurrencyLimiter(limit)
}
return ConcurrencyChanged
}
Expand Down
Loading

0 comments on commit 35e8e95

Please sign in to comment.