Skip to content

Commit

Permalink
Fixed race conditions in leakybucket
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed May 14, 2024
1 parent 8f41432 commit 1954bed
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 198 deletions.
219 changes: 116 additions & 103 deletions algorithms.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import "sync"

type Cache interface {
Add(item *CacheItem) bool
UpdateExpiration(key string, expireAt int64) bool
GetItem(key string) (value *CacheItem, ok bool)
Each() chan *CacheItem
Remove(key string)
Expand Down
19 changes: 12 additions & 7 deletions cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,25 @@ func (m *cacheManager) GetRateLimit(ctx context.Context, req *RateLimitReq, stat
switch req.Algorithm {
case Algorithm_TOKEN_BUCKET:
rlResponse, err = tokenBucket(rateContext{
Store: m.conf.Store,
Cache: m.cache,
ReqState: state,
Request: req,
Context: ctx,
InstanceID: m.conf.InstanceID,
Store: m.conf.Store,
Cache: m.cache,
ReqState: state,
Request: req,
Context: ctx,
})
if err != nil {
msg := "Error in tokenBucket"
countError(err, msg)
}

case Algorithm_LEAKY_BUCKET:
rlResponse, err = leakyBucket(ctx, m.conf.Store, m.cache, req, state)
rlResponse, err = leakyBucket(rateContext{
Store: m.conf.Store,
Cache: m.cache,
ReqState: state,
Request: req,
Context: ctx,
})
if err != nil {
msg := "Error in leakyBucket"
countError(err, msg)
Expand Down
20 changes: 0 additions & 20 deletions lrucache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,6 @@ func (c *LRUCache) GetItem(key string) (item *CacheItem, ok bool) {
if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*CacheItem)

// TODO(thrawn01): Remove
//if entry.IsExpired() {
// c.removeElement(ele)
// metricCacheAccess.WithLabelValues("miss").Add(1)
// return
//}

metricCacheAccess.WithLabelValues("hit").Add(1)
c.ll.MoveToFront(ele)
return entry, true
Expand Down Expand Up @@ -168,19 +161,6 @@ func (c *LRUCache) Size() int64 {
return atomic.LoadInt64(&c.cacheLen)
}

// UpdateExpiration updates the expiration time for the key
func (c *LRUCache) UpdateExpiration(key string, expireAt int64) bool {
c.mu.Lock()
defer c.mu.Unlock()

if ele, hit := c.cache[key]; hit {
entry := ele.Value.(*CacheItem)
entry.ExpireAt = expireAt
return true
}
return false
}

func (c *LRUCache) Close() error {
c.cache = nil
c.ll = nil
Expand Down
5 changes: 0 additions & 5 deletions mock_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ func (m *MockCache) Add(item *guber.CacheItem) bool {
return args.Bool(0)
}

func (m *MockCache) UpdateExpiration(key string, expireAt int64) bool {
args := m.Called(key, expireAt)
return args.Bool(0)
}

func (m *MockCache) GetItem(key string) (value *guber.CacheItem, ok bool) {
args := m.Called(key)
retval, _ := args.Get(0).(*guber.CacheItem)
Expand Down
23 changes: 0 additions & 23 deletions otter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,33 +50,10 @@ func (o *OtterCache) GetItem(key string) (*CacheItem, bool) {
return nil, false
}

// TODO(thrawn01): Remove
//if item.IsExpired() {
// metricCacheAccess.WithLabelValues("miss").Add(1)
// // If the item is expired, just return `nil`
// //
// // We avoid the explicit deletion of the expired item to avoid acquiring a mutex lock in otter.
// // Explicit deletions in otter require a mutex, which can cause performance bottlenecks
// // under high concurrency scenarios. By allowing the item to be evicted naturally by
// // otter's eviction mechanism, we avoid impacting performance under high concurrency.
// return nil, false
//}
metricCacheAccess.WithLabelValues("hit").Add(1)
return item, true
}

// UpdateExpiration will update an item in the cache with a new expiration date.
// returns true if the item exists in the cache and was updated.
func (o *OtterCache) UpdateExpiration(key string, expireAt int64) bool {
item, ok := o.cache.Get(key)
if !ok {
return false
}

item.ExpireAt = expireAt
return true
}

// Each returns a channel which the call can use to iterate through
// all the items in the cache.
func (o *OtterCache) Each() chan *CacheItem {
Expand Down
37 changes: 0 additions & 37 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package gubernator

import (
"context"
"sync"
)

// PERSISTENT STORE DETAILS
Expand All @@ -30,7 +29,6 @@ import (
// Both interfaces can be implemented simultaneously to ensure data is always saved to persistent storage.

type LeakyBucketItem struct {
mutex sync.Mutex
Limit int64
Duration int64
Remaining float64
Expand Down Expand Up @@ -81,41 +79,6 @@ type Loader interface {
Save(chan *CacheItem) error
}

// TODO Remove
//func NewMockStore() *MockStore {
// ml := &MockStore{
// Called: make(map[string]int),
// CacheItems: make(map[string]*CacheItem),
// }
// ml.Called["OnChange()"] = 0
// ml.Called["Remove()"] = 0
// ml.Called["Get()"] = 0
// return ml
//}
//
//type MockStore struct {
// Called map[string]int
// CacheItems map[string]*CacheItem
//}
//
//var _ Store = &MockStore{}
//
//func (ms *MockStore) OnChange(ctx context.Context, r *RateLimitReq, item *CacheItem) {
// ms.Called["OnChange()"] += 1
// ms.CacheItems[item.Key] = item
//}
//
//func (ms *MockStore) Get(ctx context.Context, r *RateLimitReq) (*CacheItem, bool) {
// ms.Called["Get()"] += 1
// item, ok := ms.CacheItems[r.HashKey()]
// return item, ok
//}
//
//func (ms *MockStore) Remove(ctx context.Context, key string) {
// ms.Called["Remove()"] += 1
// delete(ms.CacheItems, key)
//}

func NewMockLoader() *MockLoader {
ml := &MockLoader{
Called: make(map[string]int),
Expand Down
5 changes: 3 additions & 2 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,9 @@ func (ms *NoOpStore) Get(ctx context.Context, r *gubernator.RateLimitReq) (*gube
// add items to the cache in parallel thus creating a race condition the code must then handle.
func TestHighContentionFromStore(t *testing.T) {
const (
numGoroutines = 1_000
numKeys = 400
// Increase these number to improve the chance of contention, but at the cost of test speed.
numGoroutines = 500
numKeys = 100
)
store := &NoOpStore{}
srv := newV1Server(t, "localhost:0", gubernator.Config{
Expand Down

0 comments on commit 1954bed

Please sign in to comment.