Skip to content

Commit

Permalink
support redis cluster as cache store (#564)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Sep 30, 2022
1 parent 30d5768 commit 8007e0d
Show file tree
Hide file tree
Showing 13 changed files with 359 additions and 67 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,19 @@ jobs:
- name: Check out code into the Go module directory
uses: actions/checkout@v2

- name: Build the stack
run: docker-compose -f storage/docker-compose-redis.yml up -d

- name: Test MariaDB
run: go test ./storage/data -run ^TestMySQL_
env:
MYSQL_URI: mysql://root:password@tcp(localhost:${{ job.services.mariadb.ports[3306] }})/

- name: Test Redis Cluster
run: go test ./storage/cache -run ^TestRedis
env:
REDIS_URI: redis+cluster://localhost:7000

golangci:
name: lint
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package client

import (
"context"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v9"
"github.com/stretchr/testify/suite"
"testing"
"time"
Expand Down
1 change: 1 addition & 0 deletions config/config.toml.template
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# The database for caching, support Redis, MySQL, Postgres and MongoDB:
# redis://<user>:<password>@<host>:<port>/<db_number>
# rediss://<user>:<password>@<host>:<port>/<db_number>
# redis+cluster://<user>:<password>@<host1>:<port1>,<host2>:<port2>,...,<hostN>:<portN>
# postgres://bob:[email protected]:5432/mydb?sslmode=verify-full
# postgresql://bob:[email protected]:5432/mydb?sslmode=verify-full
# mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/go-playground/locales v0.14.0
github.com/go-playground/universal-translator v0.18.0
github.com/go-playground/validator/v10 v10.11.0
github.com/go-redis/redis/v8 v8.11.5
github.com/go-redis/redis/v9 v9.0.0-beta.2
github.com/go-resty/resty/v2 v2.7.0
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.3.0
Expand Down
6 changes: 3 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ github.com/go-playground/universal-translator v0.18.0 h1:82dyy6p4OuJq4/CByFNOn/j
github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA=
github.com/go-playground/validator/v10 v10.11.0 h1:0W+xRM511GY47Yy3bZUbJVitCNg2BOGlCyvTqsp/xIw=
github.com/go-playground/validator/v10 v10.11.0/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redis/redis/v9 v9.0.0-beta.2 h1:ZSr84TsnQyKMAg8gnV+oawuQezeJR11/09THcWCQzr4=
github.com/go-redis/redis/v9 v9.0.0-beta.2/go.mod h1:Bldcd/M/bm9HbnNPi/LUtYBSD8ttcZYBMupwMXhdU0o=
github.com/go-resty/resty/v2 v2.7.0 h1:me+K9p3uhSmXtrBZ4k9jcEAfJmuC8IivWHwaLZwPrFY=
github.com/go-resty/resty/v2 v2.7.0/go.mod h1:9PWDzw47qPphMRFfhsyk0NnSgvluHcljSMVIq3w7q0I=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
Expand Down Expand Up @@ -397,7 +397,7 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.20.0 h1:8W0cWlwFkflGPLltQvLRB7ZVD5HuP6ng320w2IS245Q=
github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY=
github.com/orcaman/concurrent-map v1.0.0/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
Expand Down
2 changes: 1 addition & 1 deletion server/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"encoding/json"
"fmt"
"github.com/emicklei/go-restful/v3"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v9"
"github.com/go-resty/resty/v2"
"github.com/samber/lo"
"github.com/stretchr/testify/require"
Expand Down
11 changes: 10 additions & 1 deletion storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"database/sql"
"github.com/araddon/dateparse"
"github.com/dzwvip/oracle"
"github.com/go-redis/redis/v8"
"github.com/go-redis/redis/v9"
"github.com/juju/errors"
"github.com/samber/lo"
"github.com/zhenghaoz/gorse/base/log"
Expand Down Expand Up @@ -304,6 +304,15 @@ func Open(path, tablePrefix string) (Database, error) {
database.client = redis.NewClient(opt)
database.TablePrefix = storage.TablePrefix(tablePrefix)
return database, nil
} else if strings.HasPrefix(path, storage.RedisClusterPrefix) {
opt, err := ParseRedisClusterURL(path)
if err != nil {
return nil, err
}
database := new(Redis)
database.client = redis.NewClusterClient(opt)
database.TablePrefix = storage.TablePrefix(tablePrefix)
return database, nil
} else if strings.HasPrefix(path, storage.MongoPrefix) || strings.HasPrefix(path, storage.MongoSrvPrefix) {
// connect to database
database := new(MongoDB)
Expand Down
216 changes: 204 additions & 12 deletions storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,176 @@ package cache

import (
"context"
"github.com/go-redis/redis/v8"
"fmt"
"github.com/go-redis/redis/v9"
"github.com/juju/errors"
"github.com/zhenghaoz/gorse/storage"
"net/url"
"strconv"
"strings"
"time"
)

func ParseRedisClusterURL(redisURL string) (*redis.ClusterOptions, error) {
options := &redis.ClusterOptions{}
uri := redisURL

var err error
if strings.HasPrefix(redisURL, storage.RedisClusterPrefix) {
uri = uri[len(storage.RedisClusterPrefix):]
} else {
return nil, fmt.Errorf("scheme must be \"redis+cluster\"")
}

if idx := strings.Index(uri, "@"); idx != -1 {
userInfo := uri[:idx]
uri = uri[idx+1:]

username := userInfo
var password string

if idx := strings.Index(userInfo, ":"); idx != -1 {
username = userInfo[:idx]
password = userInfo[idx+1:]
}

// Validate and process the username.
if strings.Contains(username, "/") {
return nil, fmt.Errorf("unescaped slash in username")
}
options.Username, err = url.PathUnescape(username)
if err != nil {
return nil, errors.Wrap(err, fmt.Errorf("invalid username"))
}

// Validate and process the password.
if strings.Contains(password, ":") {
return nil, fmt.Errorf("unescaped colon in password")
}
if strings.Contains(password, "/") {
return nil, fmt.Errorf("unescaped slash in password")
}
options.Password, err = url.PathUnescape(password)
if err != nil {
return nil, errors.Wrap(err, fmt.Errorf("invalid password"))
}
}

// fetch the hosts field
hosts := uri
if idx := strings.IndexAny(uri, "/?@"); idx != -1 {
if uri[idx] == '@' {
return nil, fmt.Errorf("unescaped @ sign in user info")
}
hosts = uri[:idx]
}

options.Addrs = strings.Split(hosts, ",")
uri = uri[len(hosts):]
if len(uri) > 0 && uri[0] == '/' {
uri = uri[1:]
}

// grab connection arguments from URI
connectionArgsFromQueryString, err := extractQueryArgsFromURI(uri)
if err != nil {
return nil, err
}
for _, pair := range connectionArgsFromQueryString {
err = addOption(options, pair)
if err != nil {
return nil, err
}
}

return options, nil
}

func extractQueryArgsFromURI(uri string) ([]string, error) {
if len(uri) == 0 {
return nil, nil
}

if uri[0] != '?' {
return nil, errors.New("must have a ? separator between path and query")
}

uri = uri[1:]
if len(uri) == 0 {
return nil, nil
}
return strings.FieldsFunc(uri, func(r rune) bool { return r == ';' || r == '&' }), nil
}

type optionHandler struct {
int *int
bool *bool
duration *time.Duration
}

func addOption(options *redis.ClusterOptions, pair string) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 || kv[0] == "" {
return fmt.Errorf("invalid option")
}

key, err := url.QueryUnescape(kv[0])
if err != nil {
return errors.Wrap(err, errors.Errorf("invalid option key %q", kv[0]))
}

value, err := url.QueryUnescape(kv[1])
if err != nil {
return errors.Wrap(err, errors.Errorf("invalid option value %q", kv[1]))
}

handlers := map[string]optionHandler{
"max_retries": {int: &options.MaxRetries},
"min_retry_backoff": {duration: &options.MinRetryBackoff},
"max_retry_backoff": {duration: &options.MaxRetryBackoff},
"dial_timeout": {duration: &options.DialTimeout},
"read_timeout": {duration: &options.ReadTimeout},
"write_timeout": {duration: &options.WriteTimeout},
"pool_fifo": {bool: &options.PoolFIFO},
"pool_size": {int: &options.PoolSize},
"pool_timeout": {duration: &options.PoolTimeout},
"min_idle_conns": {int: &options.MinIdleConns},
"max_idle_conns": {int: &options.MaxIdleConns},
"conn_max_idle_time": {duration: &options.ConnMaxIdleTime},
"conn_max_lifetime": {duration: &options.ConnMaxLifetime},
}

lowerKey := strings.ToLower(key)
if handler, ok := handlers[lowerKey]; ok {
if handler.int != nil {
*handler.int, err = strconv.Atoi(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else if handler.duration != nil {
*handler.duration, err = time.ParseDuration(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else if handler.bool != nil {
*handler.bool, err = strconv.ParseBool(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else {
return fmt.Errorf("redis: unexpected option: %s", key)
}
} else {
return fmt.Errorf("redis: unexpected option: %s", key)
}

return nil
}

// Redis cache storage.
type Redis struct {
storage.TablePrefix
client *redis.Client
client redis.UniversalClient
}

// Close redis connection.
Expand All @@ -39,14 +199,24 @@ func (r *Redis) Init() error {
}

func (r *Redis) Scan(work func(string) error) error {
ctx := context.Background()
if clusterClient, isCluster := r.client.(*redis.ClusterClient); isCluster {
return clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
return r.scan(ctx, client, work)
})
} else {
return r.scan(ctx, r.client, work)
}
}

func (r *Redis) scan(ctx context.Context, client redis.UniversalClient, work func(string) error) error {
var (
ctx = context.Background()
result []string
cursor uint64
err error
)
for {
result, cursor, err = r.client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result()
result, cursor, err = client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result()
if err != nil {
return errors.Trace(err)
}
Expand All @@ -62,22 +232,44 @@ func (r *Redis) Scan(work func(string) error) error {
}

func (r *Redis) Purge() error {
ctx := context.Background()
if clusterClient, isCluster := r.client.(*redis.ClusterClient); isCluster {
return clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error {
return r.purge(ctx, client, isCluster)
})
} else {
return r.purge(ctx, r.client, isCluster)
}
}

func (r *Redis) purge(ctx context.Context, client redis.UniversalClient, isCluster bool) error {
var (
ctx = context.Background()
result []string
cursor uint64
err error
)
for {
result, cursor, err = r.client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result()
result, cursor, err = client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result()
if err != nil {
return errors.Trace(err)
}
if len(result) == 0 {
return nil
}
if err = r.client.Del(ctx, result...).Err(); err != nil {
return errors.Trace(err)
if isCluster {
p := client.Pipeline()
for _, key := range result {
if err = p.Del(ctx, key).Err(); err != nil {
return errors.Trace(err)
}
}
if _, err = p.Exec(ctx); err != nil {
return errors.Trace(err)
}
} else {
if err = client.Del(ctx, result...).Err(); err != nil {
return errors.Trace(err)
}
}
if cursor == 0 {
return nil
Expand Down Expand Up @@ -211,9 +403,9 @@ func (r *Redis) AddSorted(sortedSets ...SortedSet) error {
p := r.client.Pipeline()
for _, sorted := range sortedSets {
if len(sorted.scores) > 0 {
members := make([]*redis.Z, 0, len(sorted.scores))
members := make([]redis.Z, 0, len(sorted.scores))
for _, score := range sorted.scores {
members = append(members, &redis.Z{Member: score.Id, Score: score.Score})
members = append(members, redis.Z{Member: score.Id, Score: score.Score})
}
p.ZAdd(ctx, r.Key(sorted.name), members...)
}
Expand All @@ -224,9 +416,9 @@ func (r *Redis) AddSorted(sortedSets ...SortedSet) error {

// SetSorted set scores in sorted set and clear previous scores.
func (r *Redis) SetSorted(key string, scores []Scored) error {
members := make([]*redis.Z, 0, len(scores))
members := make([]redis.Z, 0, len(scores))
for _, score := range scores {
members = append(members, &redis.Z{Member: score.Id, Score: float64(score.Score)})
members = append(members, redis.Z{Member: score.Id, Score: float64(score.Score)})
}
ctx := context.Background()
pipeline := r.client.Pipeline()
Expand Down
Loading

0 comments on commit 8007e0d

Please sign in to comment.