Skip to content

Commit

Permalink
Cherry-pick dapr#3044
Browse files Browse the repository at this point in the history
Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle committed Aug 5, 2023
1 parent 31ccb5f commit 659e239
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 129 deletions.
27 changes: 27 additions & 0 deletions internal/component/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,33 @@ func GetServerVersion(c RedisClient) (string, error) {
return "", fmt.Errorf("could not find redis_version in redis info response")
}

// GetConnectedSlaves returns the number of slaves connected to the Redis master.
func GetConnectedSlaves(ctx context.Context, c RedisClient) (int, error) {
const connectedSlavesReplicas = "connected_slaves:"

res, err := c.DoRead(ctx, "INFO", "replication")
if err != nil {
return 0, err
}

// Response example: https://redis.io/commands/info#return-value
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
if len(s) == 0 {
return 0, nil
}

infos := strings.Split(s, "\r\n")
for _, info := range infos {
if strings.HasPrefix(info, connectedSlavesReplicas) {
parsedReplicas, _ := strconv.ParseInt(info[len(connectedSlavesReplicas):], 10, 32)
return int(parsedReplicas), nil
}
}

return 0, nil
}

type RedisError string

func (e RedisError) Error() string { return string(e) }
Expand Down
131 changes: 45 additions & 86 deletions lock/redis/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ package redis

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"

rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
Expand All @@ -27,13 +26,10 @@ import (
"github.com/dapr/kit/logger"
)

const (
unlockScript = "local v = redis.call(\"get\",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call(\"del\",KEYS[1]) end"
connectedSlavesReplicas = "connected_slaves:"
infoReplicationDelimiter = "\r\n"
)
const unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end`

// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
// Standalone Redis lock store.
// Any fail-over related features are not supported, such as Sentinel and Redis Cluster.
type StandaloneRedisLock struct {
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
Expand All @@ -52,83 +48,46 @@ func NewStandaloneRedisLock(logger logger.Logger) lock.Store {
}

// Init StandaloneRedisLock.
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) error {
// must have `redisHost`
if metadata.Properties["redisHost"] == "" {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. redisHost is empty")
}
// no failover
if needFailover(metadata.Properties) {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Failover is not supported")
}
// construct client
var err error
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) (err error) {
// Create the client
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.LockStoreType)
if err != nil {
return err
}
// 3. connect to redis
if _, err = r.client.PingResult(ctx); err != nil {
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
// no replica
replicas, err := r.getConnectedSlaves(ctx)
// pass the validation if error occurs,
// since some redis versions such as miniredis do not recognize the `INFO` command.
if err == nil && replicas > 0 {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Replication is not supported")
}
return nil
}

func needFailover(properties map[string]string) bool {
if val, ok := properties["failover"]; ok && val != "" {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return false
}
return parsedVal
// Ensure we have a host
if r.clientSettings.Host == "" {
return errors.New("metadata property redisHost is empty")
}
return false
}

func (r *StandaloneRedisLock) getConnectedSlaves(ctx context.Context) (int, error) {
res, err := r.client.DoRead(ctx, "INFO", "replication")
if err != nil {
return 0, err
// We do not support failover or having replicas
if r.clientSettings.Failover {
return errors.New("this component does not support connecting to Redis with failover")
}

// Response example: https://redis.io/commands/info#return-value
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
if len(s) == 0 {
return 0, nil
// Ping Redis to ensure the connection is uo
if _, err = r.client.PingResult(ctx); err != nil {
return fmt.Errorf("error connecting to Redis: %v", err)
}

return r.parseConnectedSlaves(s), nil
}

func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int {
infos := strings.Split(res, infoReplicationDelimiter)
for _, info := range infos {
if strings.Contains(info, connectedSlavesReplicas) {
parsedReplicas, _ := strconv.ParseUint(info[len(connectedSlavesReplicas):], 10, 32)

return int(parsedReplicas)
}
// Ensure there are no replicas
// Pass the validation if error occurs, since some Redis versions such as miniredis do not recognize the `INFO` command.
replicas, err := rediscomponent.GetConnectedSlaves(ctx, r.client)
if err == nil && replicas > 0 {
return errors.New("replication is not supported")
}

return 0
return nil
}

// Try to acquire a redis lock.
// TryLock tries to acquire a lock.
// If the lock cannot be acquired, it returns immediately.
func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
// 1.Setting redis expiration time
// Set a key if doesn't exist with an expiration time
nxval, err := r.client.SetNX(ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
if nxval == nil {
return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID)
return &lock.TryLockResponse{}, fmt.Errorf("setNX returned a nil response")
}
// 2. check error

if err != nil {
return &lock.TryLockResponse{}, err
}
Expand All @@ -138,46 +97,46 @@ func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequ
}, nil
}

// Try to release a redis lock.
// Unlock tries to release a lock if the lock is still valid.
func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
// 1. delegate to client.eval lua script
// Delegate to client.eval lua script
evalInt, parseErr, err := r.client.EvalInt(ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
// 2. check error
if evalInt == nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID)
res := &lock.UnlockResponse{
Status: lock.InternalError,
}
return res, errors.New("eval unlock script returned a nil response")
}
// 3. parse result
i := *evalInt
status := lock.InternalError

// Parse result
if parseErr != nil {
return &lock.UnlockResponse{
Status: status,
Status: lock.InternalError,
}, err
}
if i >= 0 {
var status lock.Status
switch {
case *evalInt >= 0:
status = lock.Success
} else if i == -1 {
case *evalInt == -1:
status = lock.LockDoesNotExist
} else if i == -2 {
case *evalInt == -2:
status = lock.LockBelongsToOthers
default:
status = lock.InternalError
}

return &lock.UnlockResponse{
Status: status,
}, nil
}

func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.InternalError,
}
}

// Close shuts down the client's redis connections.
func (r *StandaloneRedisLock) Close() error {
if r.client != nil {
closeErr := r.client.Close()
err := r.client.Close()
r.client = nil
return closeErr
return err
}
return nil
}
Expand Down
82 changes: 39 additions & 43 deletions lock/redis/standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ package redis

import (
"context"
"sync"
"testing"

miniredis "github.com/alicebob/miniredis/v2"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/components-contrib/lock"
"github.com/dapr/components-contrib/metadata"
Expand Down Expand Up @@ -84,9 +84,10 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
// 0. prepare
// start redis
s, err := miniredis.Run()
assert.NoError(t, err)
require.NoError(t, err)
defer s.Close()
// construct component

// Construct component
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
defer comp.Close()

Expand All @@ -95,59 +96,54 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
}}
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""
// init

// Init
err = comp.InitLockStore(context.Background(), cfg)
assert.NoError(t, err)
require.NoError(t, err)

// 1. client1 trylock
ownerID1 := uuid.New().String()
resp, err := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: ownerID1,
ExpiryInSeconds: 10,
})
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, resp.Success)
var wg sync.WaitGroup
wg.Add(1)

// 2. Client2 tryLock fail
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
assert.NoError(t, err2)
assert.False(t, resp2.Success)
wg.Done()
}()
wg.Wait()
// 3. client 1 unlock
owner2 := uuid.New().String()
resp, err = comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
require.NoError(t, err)
assert.False(t, resp.Success)

// 3. Client 1 unlock
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: ownerID1,
})
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client1 failed to unlock!")
// 4. client 2 get lock
wg.Add(1)
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
assert.NoError(t, err2)
assert.True(t, resp2.Success, "client2 failed to get lock?!")
// 5. client2 unlock
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: owner2,
})
assert.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
wg.Done()
}()
wg.Wait()

// 4. Client 2 get lock
owner2 = uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
require.NoError(t, err2)
assert.True(t, resp2.Success, "client2 failed to get lock?!")

// 5. client2 unlock
unlockResp, err = comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: owner2,
})
require.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
}

0 comments on commit 659e239

Please sign in to comment.