Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg: delete config when QPS and concurrency are both deleted #8653

Merged
merged 8 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions pkg/ratelimit/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 15,
fail: 5,
Expand All @@ -106,15 +106,15 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyNoChange)
re.NotZero(status & LimiterNotChanged)
},
checkStatusFunc: func(_ string) {},
},
{
opt: UpdateConcurrencyLimiter(5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 15,
fail: 10,
Expand All @@ -131,7 +131,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 15,
fail: 0,
Expand All @@ -153,7 +153,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(15),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 0,
Expand All @@ -170,7 +170,7 @@ func TestControllerWithConcurrencyLimiter(t *testing.T) {
opt: UpdateConcurrencyLimiter(10),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 10,
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 3,
fail: 2,
Expand All @@ -238,15 +238,15 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSNoChange)
re.NotZero(status & LimiterNotChanged)
},
checkStatusFunc: func(_ string) {},
},
{
opt: UpdateQPSLimiter(5, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -262,7 +262,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand All @@ -284,7 +284,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(50, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -300,7 +300,7 @@ func TestControllerWithQPSLimiter(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
}),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 200,
fail: 100,
Expand All @@ -355,7 +355,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(float64(rate.Every(time.Second)), 1),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 200,
fail: 199,
Expand All @@ -377,7 +377,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(50, 5),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
},
totalRequest: 10,
fail: 5,
Expand All @@ -393,7 +393,7 @@ func TestControllerWithTwoLimiters(t *testing.T) {
opt: UpdateQPSLimiter(0, 0),
checkOptionStatus: func(label string, o Option) {
status := limiter.Update(label, o)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
},
totalRequest: 10,
fail: 0,
Expand Down
39 changes: 18 additions & 21 deletions pkg/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,8 @@ func (l *limiter) getRateLimiter() *RateLimiter {
return l.rate
}

func (l *limiter) deleteRateLimiter() bool {
l.mu.Lock()
defer l.mu.Unlock()
l.rate = nil
return l.isEmpty()
}

func (l *limiter) isEmpty() bool {
return l.concurrency == nil && l.rate == nil
return (l.concurrency == nil || l.concurrency.limit == 0) && l.rate == nil
}

func (l *limiter) getQPSLimiterStatus() (limit rate.Limit, burst int) {
Expand All @@ -89,47 +82,51 @@ func (l *limiter) getConcurrencyLimiterStatus() (limit uint64, current uint64) {
func (l *limiter) updateConcurrencyConfig(limit uint64) UpdateStatus {
oldConcurrencyLimit, _ := l.getConcurrencyLimiterStatus()
if oldConcurrencyLimit == limit {
return ConcurrencyNoChange
return LimiterNotChanged
}

l.mu.Lock()
defer l.mu.Unlock()
if l.concurrency != nil {
if limit < 1 {
l.concurrency.setLimit(0)
return ConcurrencyDeleted
l.concurrency = NewConcurrencyLimiter(0)
if l.isEmpty() {
return LimiterDeleted
}
return LimiterUpdated
}
l.concurrency.setLimit(limit)
} else {
l.concurrency = NewConcurrencyLimiter(limit)
}
return ConcurrencyChanged
return LimiterUpdated
}

func (l *limiter) updateQPSConfig(limit float64, burst int) UpdateStatus {
oldQPSLimit, oldBurst := l.getQPSLimiterStatus()
if math.Abs(float64(oldQPSLimit)-limit) < eps && oldBurst == burst {
return QPSNoChange
}
if limit <= eps || burst < 1 {
l.deleteRateLimiter()
return QPSDeleted
return LimiterNotChanged
}
l.mu.Lock()
defer l.mu.Unlock()
if l.rate != nil {
if limit <= eps || burst < 1 {
l.rate = nil
if l.isEmpty() {
return LimiterDeleted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be LimiterNoChanged?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, LimiterDeleted indicates that we should remove the corresponding label from the config.

}
return LimiterUpdated
}
l.rate.SetLimit(rate.Limit(limit))
l.rate.SetBurst(burst)
} else {
l.rate = NewRateLimiter(limit, burst)
}
return QPSChanged
return LimiterUpdated
}

func (l *limiter) updateDimensionConfig(cfg *DimensionConfig) UpdateStatus {
status := l.updateQPSConfig(cfg.QPS, cfg.QPSBurst)
status |= l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
return status
return l.updateQPSConfig(cfg.QPS, cfg.QPSBurst) | l.updateConcurrencyConfig(cfg.ConcurrencyLimit)
}

func (l *limiter) allow() (DoneFunc, error) {
Expand Down
28 changes: 14 additions & 14 deletions pkg/ratelimit/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {

limiter := newLimiter()
status := limiter.updateConcurrencyConfig(10)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
var lock syncutil.Mutex
successCount, failedCount := 0, 0
var wg sync.WaitGroup
Expand All @@ -67,10 +67,10 @@ func TestWithConcurrencyLimiter(t *testing.T) {
re.Equal(uint64(0), current)

status = limiter.updateConcurrencyConfig(10)
re.NotZero(status & ConcurrencyNoChange)
re.NotZero(status & LimiterNotChanged)

status = limiter.updateConcurrencyConfig(5)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand All @@ -85,7 +85,7 @@ func TestWithConcurrencyLimiter(t *testing.T) {
}

status = limiter.updateConcurrencyConfig(0)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & LimiterDeleted)
failedCount = 0
successCount = 0
for i := 0; i < 15; i++ {
Expand All @@ -105,7 +105,7 @@ func TestWithQPSLimiter(t *testing.T) {
re := require.New(t)
limiter := newLimiter()
status := limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)

var lock syncutil.Mutex
successCount, failedCount := 0, 0
Expand All @@ -124,10 +124,10 @@ func TestWithQPSLimiter(t *testing.T) {
re.Equal(1, burst)

status = limiter.updateQPSConfig(float64(rate.Every(time.Second)), 1)
re.NotZero(status & QPSNoChange)
re.NotZero(status & LimiterNotChanged)

status = limiter.updateQPSConfig(5, 5)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
limit, burst = limiter.getQPSLimiterStatus()
re.Equal(rate.Limit(5), limit)
re.Equal(5, burst)
Expand All @@ -145,7 +145,7 @@ func TestWithQPSLimiter(t *testing.T) {
time.Sleep(time.Second)

status = limiter.updateQPSConfig(0, 0)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
for i := 0; i < 10; i++ {
_, err := limiter.allow()
re.NoError(err)
Expand All @@ -157,7 +157,7 @@ func TestWithQPSLimiter(t *testing.T) {
successCount = 0
failedCount = 0
status = limiter.updateQPSConfig(float64(rate.Every(3*time.Second)), 100)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
wg.Add(200)
for i := 0; i < 200; i++ {
go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r)
Expand All @@ -183,8 +183,8 @@ func TestWithTwoLimiters(t *testing.T) {
}
limiter := newLimiter()
status := limiter.updateDimensionConfig(cfg)
re.NotZero(status & QPSChanged)
re.NotZero(status & ConcurrencyChanged)
re.NotZero(status & LimiterUpdated)
re.NotZero(status & LimiterUpdated)

var lock syncutil.Mutex
successCount, failedCount := 0, 0
Expand All @@ -211,7 +211,7 @@ func TestWithTwoLimiters(t *testing.T) {
r.release()
}
status = limiter.updateQPSConfig(float64(rate.Every(10*time.Second)), 1)
re.NotZero(status & QPSChanged)
re.NotZero(status & LimiterUpdated)
wg.Add(100)
for i := 0; i < 100; i++ {
go countSingleLimiterHandleResult(limiter, &successCount, &failedCount, &lock, &wg, r)
Expand All @@ -225,8 +225,8 @@ func TestWithTwoLimiters(t *testing.T) {

cfg = &DimensionConfig{}
status = limiter.updateDimensionConfig(cfg)
re.NotZero(status & ConcurrencyDeleted)
re.NotZero(status & QPSDeleted)
re.NotZero(status & LimiterDeleted)
re.NotZero(status & LimiterDeleted)
}

func countSingleLimiterHandleResult(limiter *limiter, successCount *int,
Expand Down
28 changes: 11 additions & 17 deletions pkg/ratelimit/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,12 @@ type UpdateStatus uint32
// Flags for limiter.
const (
eps float64 = 1e-8
// QPSNoChange shows that limiter's config isn't changed.
QPSNoChange UpdateStatus = 1 << iota
// QPSChanged shows that limiter's config is changed and not deleted.
QPSChanged
// QPSDeleted shows that limiter's config is deleted.
QPSDeleted
// ConcurrencyNoChange shows that limiter's config isn't changed.
ConcurrencyNoChange
// ConcurrencyChanged shows that limiter's config is changed and not deleted.
ConcurrencyChanged
// ConcurrencyDeleted shows that limiter's config is deleted.
ConcurrencyDeleted

LimiterNotChanged UpdateStatus = 1 << iota
// LimiterUpdated shows that limiter's config is updated.
LimiterUpdated
// LimiterDeleted shows that limiter's config is deleted.
LimiterDeleted
// InAllowList shows that limiter's config isn't changed because it is in in allow list.
InAllowList
)
Expand All @@ -45,7 +39,7 @@ type Option func(string, *Controller) UpdateStatus
func AddLabelAllowList() Option {
return func(label string, l *Controller) UpdateStatus {
l.labelAllowList[label] = struct{}{}
return 0
return InAllowList
}
}

Expand Down Expand Up @@ -73,11 +67,11 @@ func UpdateQPSLimiter(limit float64, burst int) Option {

// UpdateDimensionConfig creates QPS limiter and concurrency limiter for a given label by config if it doesn't exist.
func UpdateDimensionConfig(cfg *DimensionConfig) Option {
return func(label string, l *Controller) UpdateStatus {
if _, allow := l.labelAllowList[label]; allow {
return func(label string, c *Controller) UpdateStatus {
if _, allow := c.labelAllowList[label]; allow {
return InAllowList
}
lim, _ := l.limiters.LoadOrStore(label, newLimiter())
lim, _ := c.limiters.LoadOrStore(label, newLimiter())
return lim.(*limiter).updateDimensionConfig(cfg)
}
}
Expand All @@ -89,6 +83,6 @@ func InitLimiter() Option {
return InAllowList
}
l.limiters.LoadOrStore(label, newLimiter())
return ConcurrencyChanged
return LimiterNotChanged
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has there been a change here, as expected?

Copy link
Member Author

@rleungx rleungx Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the limit is not changed when we initialize the limiter.

}
}
Loading