Skip to content

Commit

Permalink
Otter is now available as a cache option
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed May 9, 2024
1 parent 548b799 commit bbc5ab6
Show file tree
Hide file tree
Showing 20 changed files with 834 additions and 831 deletions.
117 changes: 70 additions & 47 deletions benchmark_cache_test.go
Original file line number Diff line number Diff line change
@@ -1,160 +1,183 @@
package gubernator_test

import (
"strconv"
"math/rand"
"sync"
"testing"
"time"

"github.com/gubernator-io/gubernator/v2"
"github.com/mailgun/holster/v4/clock"
"github.com/stretchr/testify/require"
)

func BenchmarkCache(b *testing.B) {
testCases := []struct {
Name string
NewTestCache func() gubernator.Cache
NewTestCache func() (gubernator.Cache, error)
LockRequired bool
}{
{
Name: "LRUCache",
NewTestCache: func() gubernator.Cache {
return gubernator.NewLRUCache(0)
NewTestCache: func() (gubernator.Cache, error) {
return gubernator.NewLRUCache(0), nil
},
LockRequired: true,
},
{
Name: "OtterCache",
NewTestCache: func() (gubernator.Cache, error) {
return gubernator.NewOtterCache(0)
},
LockRequired: false,
},
}

for _, testCase := range testCases {
b.Run(testCase.Name, func(b *testing.B) {
b.Run("Sequential reads", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
for _, key := range keys {
item := &gubernator.CacheItem{
Key: key,
Value: i,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
}

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
_, _ = cache.GetItem(key)
index := int(rand.Uint32() & uint32(mask))
_, _ = cache.GetItem(keys[index&mask])
}
})

b.Run("Sequential writes", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
index := int(rand.Uint32() & uint32(mask))
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: keys[index&mask],
Value: "value:" + keys[index&mask],
ExpireAt: expire,
}
cache.Add(item)
}
})

b.Run("Concurrent reads", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

for i := 0; i < b.N; i++ {
key := strconv.Itoa(i)
for _, key := range keys {
item := &gubernator.CacheItem{
Key: key,
Value: i,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
}

var wg sync.WaitGroup
var mutex sync.Mutex
var task func(i int)
var task func(key string)

if testCase.LockRequired {
task = func(i int) {
task = func(key string) {
mutex.Lock()
defer mutex.Unlock()
key := strconv.Itoa(i)
_, _ = cache.GetItem(key)
wg.Done()
}
} else {
task = func(i int) {
key := strconv.Itoa(i)
task = func(key string) {
_, _ = cache.GetItem(key)
wg.Done()
}
}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)
go task(i)
}
mask := len(keys) - 1

b.RunParallel(func(pb *testing.PB) {
index := int(rand.Uint32() & uint32(mask))
for pb.Next() {
task(keys[index&mask])
}
})

wg.Wait()
})

b.Run("Concurrent writes", func(b *testing.B) {
cache := testCase.NewTestCache()
cache, err := testCase.NewTestCache()
require.NoError(b, err)
expire := clock.Now().Add(time.Hour).UnixMilli()
keys := GenerateRandomKeys()

var wg sync.WaitGroup
var mutex sync.Mutex
var task func(i int)
var task func(key string)

if testCase.LockRequired {
task = func(i int) {
task = func(key string) {
mutex.Lock()
defer mutex.Unlock()
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: key,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
wg.Done()
}
} else {
task = func(i int) {
task = func(key string) {
item := &gubernator.CacheItem{
Key: strconv.Itoa(i),
Value: i,
Key: key,
Value: "value:" + key,
ExpireAt: expire,
}
cache.Add(item)
wg.Done()
}
}

mask := len(keys) - 1
b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
wg.Add(1)
go task(i)
}

wg.Wait()
b.RunParallel(func(pb *testing.PB) {
index := int(rand.Uint32() & uint32(mask))
for pb.Next() {
task(keys[index&mask])
}
})
})

})
}
}

const cacheSize = 32768

func GenerateRandomKeys() []string {
keys := make([]string, 0, cacheSize)
for i := 0; i < cacheSize; i++ {
keys = append(keys, gubernator.RandomString(20))
}
return keys
}
153 changes: 153 additions & 0 deletions cache_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
Copyright 2024 Derrick J. Wippler
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gubernator

import (
"context"
"sync"

"github.com/pkg/errors"
)

type CacheManager interface {
GetRateLimit(context.Context, *RateLimitReq, RateLimitReqState) (*RateLimitResp, error)
GetCacheItem(context.Context, string) (*CacheItem, bool, error)
AddCacheItem(context.Context, string, *CacheItem) error
Store(ctx context.Context) error
Load(context.Context) error
Close() error
}

type otterCacheManager struct {
conf Config
cache Cache
}

// NewCacheManager creates a new instance of the CacheManager interface using
// the otter cache (https://maypok86.github.io/otter/)
func NewCacheManager(conf Config) (CacheManager, error) {

cache, err := conf.CacheFactory(conf.CacheSize)
if err != nil {
return nil, err
}
return &otterCacheManager{
cache: cache,
conf: conf,
}, nil
}

// GetRateLimit fetches the item from the cache if it exists, and preforms the appropriate rate limit calculation
func (m *otterCacheManager) GetRateLimit(ctx context.Context, req *RateLimitReq, state RateLimitReqState) (*RateLimitResp, error) {
var rlResponse *RateLimitResp
var err error

switch req.Algorithm {
case Algorithm_TOKEN_BUCKET:
rlResponse, err = tokenBucket(ctx, m.conf.Store, m.cache, req, state)
if err != nil {
msg := "Error in tokenBucket"
countError(err, msg)
}

case Algorithm_LEAKY_BUCKET:
rlResponse, err = leakyBucket(ctx, m.conf.Store, m.cache, req, state)
if err != nil {
msg := "Error in leakyBucket"
countError(err, msg)
}

default:
err = errors.Errorf("Invalid rate limit algorithm '%d'", req.Algorithm)
}

return rlResponse, err
}

// Store saves every cache item into persistent storage provided via Config.Loader
func (m *otterCacheManager) Store(ctx context.Context) error {
out := make(chan *CacheItem, 500)
var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
for item := range m.cache.Each() {
select {
case out <- item:

case <-ctx.Done():
return
}
}
}()

go func() {
wg.Wait()
close(out)
}()

if ctx.Err() != nil {
return ctx.Err()
}

if err := m.conf.Loader.Save(out); err != nil {
return errors.Wrap(err, "while calling p.conf.Loader.Save()")
}
return nil
}

// Close closes the cache manager
func (m *otterCacheManager) Close() error {
return m.cache.Close()
}

// Load cache items from persistent storage provided via Config.Loader
func (m *otterCacheManager) Load(ctx context.Context) error {
ch, err := m.conf.Loader.Load()
if err != nil {
return errors.Wrap(err, "Error in loader.Load")
}

for {
var item *CacheItem
var ok bool

select {
case item, ok = <-ch:
if !ok {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
_ = m.cache.Add(item)
}
}

// GetCacheItem returns an item from the cache
func (m *otterCacheManager) GetCacheItem(_ context.Context, key string) (*CacheItem, bool, error) {
item, ok := m.cache.GetItem(key)
return item, ok, nil
}

// AddCacheItem adds an item to the cache. The CacheItem.Key should be set correctly, else the item
// will not be added to the cache correctly.
func (m *otterCacheManager) AddCacheItem(_ context.Context, _ string, item *CacheItem) error {
_ = m.cache.Add(item)
return nil
}
Loading

0 comments on commit bbc5ab6

Please sign in to comment.