Skip to content

Commit

Permalink
MegaFix global behavior bugs. (#225)
Browse files Browse the repository at this point in the history
* Fix global behavior `ResetTime` bug.

Every call to `GetRateLimits` would reset the `ResetTime` and not the `Remaining` counter.  This would cause counters to eventually deplete and never fully reset.

* Refine request time propagation.

Request time is resolved at first call to `getLocalRateLimit()`, then is propagated across peer-to-peer for global behavior.

* Fix race condition in global behavior.

QueueUpdate() allowed for sending request/response when local ratelimits are updated.  However, the order they get called to QueueUpdate() is not guaranteed to be chronological.  This causes stale updates to propagate, causing lost hits.
Instead, QueueUpdate() will only pass the request.  The current ratelimit state will be retrieved immediately before propagation.

Rigorous functional tests added around global behavior.

* Fix intermittent test error caused by `TestHealthCheck`.

* Refactor global behavior and functional tests for stability.

- Simplify passing of request time across layers.
- Better handling of metrics in tests.
- Better detection of global broadcasts, global updates, and idle.
- Drop redundant metric `guberator_global_broadcast_counter`.
- Fix metric `gubernator_global_queue_length` for global broadcast.
- Add metric `gubernator_global_send_queue_length` for global send.

* Backwards compatibility needed for upgrading.

* Don't call `OnChange()` event from non-owner.

Non-owners shouldn't be persisting rate limit state.

* Simplify cache item expiration check.

* Rename `RequestTime` to `CreatedAt` in protos.
  • Loading branch information
Baliedge authored Mar 19, 2024
1 parent 63fd9ff commit 3982e50
Show file tree
Hide file tree
Showing 20 changed files with 1,212 additions and 559 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ $(GOLANGCI_LINT): ## Download Go linter

.PHONY: lint
lint: $(GOLANGCI_LINT) ## Run Go linter
$(GOLANGCI_LINT) run -v --fix -c .golangci.yml ./...
$(GOLANGCI_LINT) run -v -c .golangci.yml ./...

.PHONY: test
test: ## Run unit tests and measure code coverage
Expand All @@ -24,7 +24,7 @@ test: ## Run unit tests and measure code coverage

.PHONY: bench
bench: ## Run Go benchmarks
go test ./... -bench . -benchtime 5s -timeout 0 -run=XXX -benchmem
go test ./... -bench . -benchtime 5s -timeout 0 -run='^$$' -benchmem

.PHONY: docker
docker: ## Build Docker image
Expand Down
87 changes: 49 additions & 38 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
// with 100 emails and the request will succeed. You can override this default behavior with `DRAIN_OVER_LIMIT`

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {

func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
tokenBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("tokenBucket"))
defer tokenBucketTimer.ObserveDuration()

Expand Down Expand Up @@ -100,7 +99,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, reqState)
}

// Update the limit if it changed.
Expand Down Expand Up @@ -133,12 +132,12 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// If our new duration means we are currently expired.
now := MillisecondNow()
if expire <= now {
createdAt := *r.CreatedAt
if expire <= createdAt {
// Renew item.
span.AddEvent("Limit has expired")
expire = now + r.Duration
t.CreatedAt = now
expire = createdAt + r.Duration
t.CreatedAt = createdAt
t.Remaining = t.Limit
}

Expand All @@ -147,7 +146,7 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
rl.ResetTime = expire
}

if s != nil {
if s != nil && reqState.IsOwner {
defer func() {
s.OnChange(ctx, r, item)
}()
Expand All @@ -162,7 +161,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// If we are already at the limit.
if rl.Remaining == 0 && r.Hits > 0 {
trace.SpanFromContext(ctx).AddEvent("Already over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
t.Status = rl.Status
return rl, nil
Expand All @@ -180,7 +181,9 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
// without updating the cache.
if r.Hits > t.Remaining {
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
if HasBehavior(r.Behavior, Behavior_DRAIN_OVER_LIMIT) {
// DRAIN_OVER_LIMIT behavior drains the remaining counter.
Expand All @@ -196,19 +199,19 @@ func tokenBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

// Item is not found in cache or store, create new.
return tokenBucketNewItem(ctx, s, c, r)
return tokenBucketNewItem(ctx, s, c, r, reqState)
}

// Called by tokenBucket() when adding a new item in the store.
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
expire := now + r.Duration
func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
createdAt := *r.CreatedAt
expire := createdAt + r.Duration

t := &TokenBucketItem{
Limit: r.Limit,
Duration: r.Duration,
Remaining: r.Limit - r.Hits,
CreatedAt: now,
CreatedAt: createdAt,
}

// Add a new rate limit to the cache.
Expand Down Expand Up @@ -236,31 +239,33 @@ func tokenBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
// Client could be requesting that we always return OVER_LIMIT.
if r.Hits > r.Limit {
trace.SpanFromContext(ctx).AddEvent("Over the limit")
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = r.Limit
t.Remaining = r.Limit
}

c.Add(item)

if s != nil {
if s != nil && reqState.IsOwner {
s.OnChange(ctx, r, item)
}

return rl, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
leakyBucketTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getRateLimit_leakyBucket"))
defer leakyBucketTimer.ObserveDuration()

if r.Burst == 0 {
r.Burst = r.Limit
}

now := MillisecondNow()
createdAt := *r.CreatedAt

// Get rate limit from cache.
hashKey := r.HashKey()
Expand Down Expand Up @@ -309,7 +314,7 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
s.Remove(ctx, hashKey)
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, reqState)
}

if HasBehavior(r.Behavior, Behavior_RESET_REMAINING) {
Expand Down Expand Up @@ -349,16 +354,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
}

if r.Hits != 0 {
c.UpdateExpiration(r.HashKey(), now+duration)
c.UpdateExpiration(r.HashKey(), createdAt+duration)
}

// Calculate how much leaked out of the bucket since the last time we leaked a hit
elapsed := now - b.UpdatedAt
elapsed := createdAt - b.UpdatedAt
leak := float64(elapsed) / rate

if int64(leak) > 0 {
b.Remaining += leak
b.UpdatedAt = now
b.UpdatedAt = createdAt
}

if int64(b.Remaining) > b.Burst {
Expand All @@ -369,20 +374,22 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
Limit: b.Limit,
Remaining: int64(b.Remaining),
Status: Status_UNDER_LIMIT,
ResetTime: now + (b.Limit-int64(b.Remaining))*int64(rate),
ResetTime: createdAt + (b.Limit-int64(b.Remaining))*int64(rate),
}

// TODO: Feature missing: check for Duration change between item/request.

if s != nil {
if s != nil && reqState.IsOwner {
defer func() {
s.OnChange(ctx, r, item)
}()
}

// If we are already at the limit
if int64(b.Remaining) == 0 && r.Hits > 0 {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
return rl, nil
}
Expand All @@ -391,14 +398,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *
if int64(b.Remaining) == r.Hits {
b.Remaining = 0
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket, unless `DRAIN_OVER_LIMIT` is set.
if r.Hits > int64(b.Remaining) {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT

// DRAIN_OVER_LIMIT behavior drains the remaining counter.
Expand All @@ -417,16 +426,16 @@ func leakyBucket(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *

b.Remaining -= float64(r.Hits)
rl.Remaining = int64(b.Remaining)
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
return rl, nil
}

return leakyBucketNewItem(ctx, s, c, r)
return leakyBucketNewItem(ctx, s, c, r, reqState)
}

// Called by leakyBucket() when adding a new item in the store.
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq, reqState RateLimitReqState) (resp *RateLimitResp, err error) {
createdAt := *r.CreatedAt
duration := r.Duration
rate := float64(duration) / float64(r.Limit)
if HasBehavior(r.Behavior, Behavior_DURATION_IS_GREGORIAN) {
Expand All @@ -445,36 +454,38 @@ func leakyBucketNewItem(ctx context.Context, s Store, c Cache, r *RateLimitReq)
Remaining: float64(r.Burst - r.Hits),
Limit: r.Limit,
Duration: duration,
UpdatedAt: now,
UpdatedAt: createdAt,
Burst: r.Burst,
}

rl := RateLimitResp{
Status: Status_UNDER_LIMIT,
Limit: b.Limit,
Remaining: r.Burst - r.Hits,
ResetTime: now + (b.Limit-(r.Burst-r.Hits))*int64(rate),
ResetTime: createdAt + (b.Limit-(r.Burst-r.Hits))*int64(rate),
}

// Client could be requesting that we start with the bucket OVER_LIMIT
if r.Hits > r.Burst {
metricOverLimitCounter.Add(1)
if reqState.IsOwner {
metricOverLimitCounter.Add(1)
}
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
rl.ResetTime = now + (rl.Limit-rl.Remaining)*int64(rate)
rl.ResetTime = createdAt + (rl.Limit-rl.Remaining)*int64(rate)
b.Remaining = 0
}

item := &CacheItem{
ExpireAt: now + duration,
ExpireAt: createdAt + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
}

c.Add(item)

if s != nil {
if s != nil && reqState.IsOwner {
s.OnChange(ctx, r, item)
}

Expand Down
Loading

0 comments on commit 3982e50

Please sign in to comment.