Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1 #8

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
70 changes: 51 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ mnemosyneManager := mnemosyne.NewMnemosyne(config, nil, nil)
cacheInstance := mnemosyneManager.select("result-cache")
```

### Working with a cacheInstance
### Working with a CacheInstance
```go
cacheInstance.Set(context, key, value)
var myCachedData myType
err := cacheInstance.Get(context, key, &myCachedData)
// cache miss is also an Error
// remember: cacheMiss is also an Error
```

## Configuration
Expand Down Expand Up @@ -53,6 +53,7 @@ cache:
ttl: 24h
amnesia: 0
compression: true

my-user-cache:
soft-ttl: 2h
layers:
Expand All @@ -73,27 +74,57 @@ cache:
compression: true
```

`soft-ttl` is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old

Each cache layer can be of types `redis`, `gaurdian`, `memory` or `tiny`. `redis` is used for a single node Redis server, `gaurdian` is used for a master-slave Redis cluster configuration, `memory` uses the BigCache library to provide an efficient and fast in-memory cache, `tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).
Note: all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines.
**Each cache layer can be of the following types:**

`redis` is used for a single node Redis server.

`gaurdian` [Depricated] is used for a master-slave Redis cluster configuration but it's being depricated in favor of `rediscluster`.

`rediscluster` is an all-encompassing configuration for both client side sharding as well as cluster Redis (or both at the same time).

`memory` uses the BigCache library to provide an efficient and fast in-memory cache.

`tiny` uses the native sync.map data structure to store smaller cache values in memory (used for low-write caches).

_Note:_ all of the cache types are sync-safe, meaning they can be safely used from simultaneously running goroutines.

#### Instance Configs:

#### Common layer configs:
**`soft-ttl`** is an instance-wide TTL which when expired will **NOT** remove the data from the instance, but warns that the data is old.

`amnesia` is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss,
a 0 amnesia means that the layers will never miss a data that they actually have, a 10 amnesia means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100 amnesia effectively turns the layer off. (Default: 0)
#### Common Layer Configs:

`compression` is whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)
**`amnesia`** is a stochastic fall-through mechanism which allows for a higher layer to be updated from a lower layer by the way of an artificial cache-miss,
an amnesia value of 0 means that the layers will never miss a data that they actually have, an amnesia value of 10 means when a key is present in the cache, 90% of the time it is returned but 10% of the time it is ignored and is treated as a cache-miss. a 100% amnesia effectively turns the layer off. (Default: 0)
_Note:_ 'SET' operations ignore Amnesia, to compeletly turn off a layer, remove its name from the layer list.

`ttl` is the hard Time To Live for the data in this particular layer, after which the data is expired and is expected to be removed.
**`compression`** dictates whther the data is compressed before being put into the cache memory. Currently only Zlib compression is supported. (Default: false)

#### Type-spesific layer configs:
**`ttl`** is the hard Time-To-Live for the data in this particular layer, after which the data is expired and is expected to be removed.

`db` [`redis` - `gaurdian`] is the Redis DB number to be used. (Default:0)
`idle-timeout` [`redis` - `gaurdian`] is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
`address` [`redis` - `gaurdian`] is the Redis Server's Address (the master's address in case of a cluster)
`slaves` [`gaurdian`] is a **list** of Redis servers addresses pertaining to the slave nodes.
`max-memory` [`memory`] is the maximum amount of system memory which can be used by this particular layer.
#### Type-spesific Layer Configs:

**`db`** {`redis` - `gaurdian`} is the Redis DB number to be used. (Default:0)
**`idle-timeout`** {`redis` - `gaurdian`} is the timeout for idle connections to the Redis Server (see Redis documentation) (Default:0 - no timeout)
**`address`** {`redis` - `gaurdian` - `rediscluster`} is the Redis Server's Address (the master's address in case of a cluster)
**`slaves`** {`gaurdian` - `rediscluster`} is a **list** of Redis servers addresses pertaining to the slave nodes.
**`max-memory`** {`memory`} is the maximum amount of system memory which can be used by this particular layer.


### Epimetheus Integration Guide

Add these two functions to your `container.go` file as well as to the `wire.build()` so _wire-gen_ can recognize the proper timer & counter to pass to Mnemosyne.

```go
func getCommTimer(epi *epimetheus.Epimetheus) mnemosyne.ITimer {
return epi.CommTimer
}

func getCacheRate(epi *epimetheus.Epimetheus) mnemosyne.ICounter {
return epi.CacheRate
}
```


## Documentation
Expand All @@ -119,9 +150,10 @@ We use [SemVer](http://semver.org/) for versioning. For the versions available,

## Authors

* **Ramtin Rostami** - *Initial work* - [rrostami](https://github.com/rrostami)
* **Pedram Teymoori** - *Initial work* - [pedramteymoori](https://github.com/pedramteymoori)
* **Parsa abdollahi** - *Initial work* - []()
* **Ramtin Rostami** [rrostami](https://github.com/rrostami) - *Initial work & Maintaining*
* **Pedram Teymoori** [pedramteymoori](https://github.com/pedramteymoori) - *Initial work & Maintaining*
* **Parsa abdollahi** - *Initial work*
* **Ava Abderezaei** [avv-va](https://github.com/avv-va) - *Tests*

See also the list of [contributors](https://github.com/cafebazaar/Mnemosyne/graphs/contributors) who participated in this project.

Expand Down
76 changes: 76 additions & 0 deletions c-memory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package mnemosyne

import (
"context"
"math/rand"
"time"

"github.com/allegro/bigcache"
"github.com/sirupsen/logrus"
)

type inMemoryCache struct {
baseCache
base *bigcache.BigCache
cacheTTL time.Duration
}

func NewInMemoryCache(opts *CacheOpts) *inMemoryCache {
internalOpts := bigcache.Config{
Shards: 1024,
LifeWindow: opts.cacheTTL,
MaxEntriesInWindow: 1100 * 10 * 60,
MaxEntrySize: 500,
Verbose: false,
HardMaxCacheSize: opts.memOpts.maxMem,
CleanWindow: 1 * time.Minute,
}
cacheInstance, err := bigcache.NewBigCache(internalOpts)
if err != nil {
logrus.Errorf("InMemCache %s Initialization Error: %v", opts.layerName, err)
}
return &inMemoryCache{
baseCache: baseCache{
layerName: opts.layerName,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
},
base: cacheInstance,
cacheTTL: opts.cacheTTL,
}
}

func (mc *inMemoryCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if mc.amnesiaChance > rand.Intn(100) {
return nil, newAmnesiaError(mc.amnesiaChance)
}
rawBytes, err := mc.base.Get(key)
if err != nil {
return nil, err
}
return finalizeCacheResponse(rawBytes, mc.compressionEnabled)
}

func (mc *inMemoryCache) Set(ctx context.Context, key string, value interface{}) error {
finalData, err := prepareCachePayload(value, mc.compressionEnabled)
if err != nil {
return err
}
return mc.base.Set(key, finalData)
}

func (mc *inMemoryCache) Delete(ctx context.Context, key string) error {
return mc.base.Delete(key)
}

func (mc *inMemoryCache) Clear() error {
return mc.base.Reset()
}

func (mc *inMemoryCache) TTL(ctx context.Context, key string) time.Duration {
return time.Second * 0
}

func (mc *inMemoryCache) Name() string {
return mc.layerName
}
158 changes: 158 additions & 0 deletions c-redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package mnemosyne

import (
"context"
"hash/fnv"
"math/rand"
"time"

"github.com/go-redis/redis"
"github.com/sirupsen/logrus"
)

type RedisClusterAddress struct {
MasterAddr string `mapstructure:"address"`
SlaveAddrs []string `mapstructure:"slaves"`
}

type RedisOpts struct {
db int
idleTimeout time.Duration
shards []*RedisClusterAddress
}

type clusterClient struct {
master *redis.Client
slaves []*redis.Client
}

type redisCache struct {
baseCache
baseClients []*clusterClient
cacheTTL time.Duration
watcher ITimer
}

func makeClient(addr string, db int, idleTimeout time.Duration) *redis.Client {
redisOptions := &redis.Options{
Addr: addr,
DB: db,
}
if idleTimeout >= time.Second {
redisOptions.IdleTimeout = idleTimeout
}
newClient := redis.NewClient(redisOptions)

if err := newClient.Ping().Err(); err != nil {
logrus.WithError(err).WithField("address", addr).Error("error pinging Redis")
}
return newClient
}

func NewShardedClusterRedisCache(opts *CacheOpts, watcher ITimer) *redisCache {
rc := &redisCache{
baseCache: baseCache{
layerName: opts.layerName,
amnesiaChance: opts.amnesiaChance,
compressionEnabled: opts.compressionEnabled,
},
cacheTTL: opts.cacheTTL,
watcher: watcher,
}
rc.baseClients = make([]*clusterClient, len(opts.redisOpts.shards))
for i, shard := range opts.redisOpts.shards {
rc.baseClients[i].master = makeClient(shard.MasterAddr,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)

rc.baseClients[i].slaves = make([]*redis.Client, len(shard.SlaveAddrs))
for j, slv := range shard.SlaveAddrs {
rc.baseClients[i].slaves[j] = makeClient(slv,
opts.redisOpts.db,
opts.redisOpts.idleTimeout)
}
}
return rc
}

func (rc *redisCache) Get(ctx context.Context, key string) (*cachableRet, error) {
if rc.amnesiaChance > rand.Intn(100) {
return nil, newAmnesiaError(rc.amnesiaChance)
}
client := rc.pickClient(key, false).WithContext(ctx)
startMarker := rc.watcher.Start()
strValue, err := client.Get(key).Result()
if err == nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "ok")
} else if err == redis.Nil {
rc.watcher.Done(startMarker, rc.layerName, "get", "miss")
} else {
rc.watcher.Done(startMarker, rc.layerName, "get", "error")
}
rawBytes := []byte(strValue)
return finalizeCacheResponse(rawBytes, rc.compressionEnabled)
}

func (rc *redisCache) Set(ctx context.Context, key string, value interface{}) error {
finalData, err := prepareCachePayload(value, rc.compressionEnabled)
if err != nil {
return err
}
client := rc.pickClient(key, true).WithContext(ctx)
startMarker := rc.watcher.Start()
setError := client.Set(key, finalData, rc.cacheTTL).Err()
if setError != nil {
rc.watcher.Done(startMarker, rc.layerName, "set", "error")
} else {
rc.watcher.Done(startMarker, rc.layerName, "set", "ok")
}
return setError
}
func (rc *redisCache) Delete(ctx context.Context, key string) error {
client := rc.pickClient(key, true).WithContext(ctx)
return client.Del(key).Err()
}

func (rc *redisCache) Clear() error {
for _, cl := range rc.baseClients {
client := cl.master
err := client.FlushDB().Err()
if err != nil {
return err
}
}
return nil
}

func (rc *redisCache) TTL(ctx context.Context, key string) time.Duration {
client := rc.pickClient(key, false).WithContext(ctx)
res, err := client.TTL(key).Result()
if err != nil {
return time.Second * 0
}
return res
}

func (rc *redisCache) pickClient(key string, modification bool) *redis.Client {
shard := rc.shardKey(key)
if modification || len(rc.baseClients[shard].slaves) == 0 {
return rc.baseClients[shard].master
}
cl := rand.Intn(len(rc.baseClients[shard].slaves))
return rc.baseClients[shard].slaves[cl]
}

func (rc *redisCache) shardKey(key string) int {
shards := len(rc.baseClients)
if shards == 1 {
return 0
}
hasher := fnv.New32a()
hasher.Write([]byte(key))
keyHash := int(hasher.Sum32())
return keyHash % shards
}

func (rc *redisCache) Name() string {
return rc.layerName
}
Loading