Skip to content

Commit

Permalink
controller: fix limiter cannot work well in high concurrency scenario
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Jul 24, 2024
1 parent 9f5522e commit f267149
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
8 changes: 6 additions & 2 deletions client/resource_group/controller/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
}
// Update state
if ok {
lim.last = now
if lim.last.Before(now) {
lim.last = now
}
lim.tokens = tokens
lim.maybeNotify()
} else {
Expand All @@ -424,7 +426,9 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
zap.Int("remaining-notify-times", lim.remainingNotifyTimes),
zap.String("name", lim.name))
}
lim.last = last
if lim.last.Before(now) {
lim.last = last
}
if lim.limit == 0 {
lim.notify()
} else if lim.remainingNotifyTimes > 0 {
Expand Down
9 changes: 4 additions & 5 deletions client/resource_group/controller/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,20 @@ func TestQPS(t *testing.T) {
reserveN int64
RU_PER_SEC int64
}{
{10000, 10, 400000},
{1000, 10, 400000},
}

for _, tc := range cases {
t.Run(fmt.Sprintf("concurrency=%d,reserveN=%d,limit=%d", tc.concurrency, tc.reserveN, tc.RU_PER_SEC), func(t *testing.T) {
qps, ruSec, waitTime := testQPSCase(tc.concurrency, tc.reserveN, tc.RU_PER_SEC)
t.Log(fmt.Printf("QPS: %.2f, RU: %.2f, new request need wait %s\n", qps, ruSec, waitTime))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(1))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(1))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)-ruSec), float64(10)*float64(tc.reserveN))
re.LessOrEqual(math.Abs(float64(tc.RU_PER_SEC)/float64(tc.reserveN)-qps), float64(10))
})
}
}

const testCaseRunTime = 3 * time.Second
const testCaseRunTime = 4 * time.Second

func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64, time.Duration) {
nc := make(chan notifyMsg, 1)
Expand Down Expand Up @@ -282,7 +282,6 @@ func testQPSCase(concurrency int, reserveN int64, limit int64) (float64, float64
windowRequests = atomic.SwapInt64(&totalRequests, 0)
qps = float64(windowRequests)
r := lim.Reserve(ctx, 30*time.Second, time.Now(), float64(reserveN))
fmt.Printf("%s: QPS: %.2f, RU: %.2f, new request need wait %s\n", time.Now(), qps, qps*float64(reserveN), wait)
wait = r.Delay()
time.Sleep(1 * time.Second)
}
Expand Down

0 comments on commit f267149

Please sign in to comment.