Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some fixes for Redis Lock component #3044

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions internal/component/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,33 @@ func GetServerVersion(c RedisClient) (string, error) {
return "", fmt.Errorf("could not find redis_version in redis info response")
}

// GetConnectedSlaves returns the number of slaves connected to the Redis master.
func GetConnectedSlaves(ctx context.Context, c RedisClient) (int, error) {
const connectedSlavesReplicas = "connected_slaves:"

res, err := c.DoRead(ctx, "INFO", "replication")
if err != nil {
return 0, err
}

// Response example: https://redis.io/commands/info#return-value
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
if len(s) == 0 {
return 0, nil
}

infos := strings.Split(s, "\r\n")
for _, info := range infos {
if strings.HasPrefix(info, connectedSlavesReplicas) {
parsedReplicas, _ := strconv.ParseInt(info[len(connectedSlavesReplicas):], 10, 32)
return int(parsedReplicas), nil
}
}

return 0, nil
}

type RedisError string

func (e RedisError) Error() string { return string(e) }
Expand Down
131 changes: 45 additions & 86 deletions lock/redis/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ package redis

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"time"

rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
Expand All @@ -27,13 +26,10 @@ import (
"github.com/dapr/kit/logger"
)

const (
unlockScript = "local v = redis.call(\"get\",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call(\"del\",KEYS[1]) end"
connectedSlavesReplicas = "connected_slaves:"
infoReplicationDelimiter = "\r\n"
)
const unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end`

// Standalone Redis lock store.Any fail-over related features are not supported,such as Sentinel and Redis Cluster.
// Standalone Redis lock store.
// Any fail-over related features are not supported, such as Sentinel and Redis Cluster.
type StandaloneRedisLock struct {
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings
Expand All @@ -52,83 +48,46 @@ func NewStandaloneRedisLock(logger logger.Logger) lock.Store {
}

// Init StandaloneRedisLock.
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) error {
// must have `redisHost`
if metadata.Properties["redisHost"] == "" {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. redisHost is empty")
}
// no failover
if needFailover(metadata.Properties) {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Failover is not supported")
}
// construct client
var err error
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) (err error) {
// Create the client
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.LockStoreType)
if err != nil {
return err
}
// 3. connect to redis
if _, err = r.client.PingResult(ctx); err != nil {
return fmt.Errorf("[standaloneRedisLock]: error connecting to redis at %s: %s", r.clientSettings.Host, err)
}
// no replica
replicas, err := r.getConnectedSlaves(ctx)
// pass the validation if error occurs,
// since some redis versions such as miniredis do not recognize the `INFO` command.
if err == nil && replicas > 0 {
return fmt.Errorf("[standaloneRedisLock]: InitLockStore error. Replication is not supported")
}
return nil
}

func needFailover(properties map[string]string) bool {
if val, ok := properties["failover"]; ok && val != "" {
parsedVal, err := strconv.ParseBool(val)
if err != nil {
return false
}
return parsedVal
// Ensure we have a host
if r.clientSettings.Host == "" {
return errors.New("metadata property redisHost is empty")
}
return false
}

func (r *StandaloneRedisLock) getConnectedSlaves(ctx context.Context) (int, error) {
res, err := r.client.DoRead(ctx, "INFO", "replication")
if err != nil {
return 0, err
// We do not support failover or having replicas
if r.clientSettings.Failover {
return errors.New("this component does not support connecting to Redis with failover")
}

// Response example: https://redis.io/commands/info#return-value
// # Replication\r\nrole:master\r\nconnected_slaves:1\r\n
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
if len(s) == 0 {
return 0, nil
// Ping Redis to ensure the connection is uo
if _, err = r.client.PingResult(ctx); err != nil {
return fmt.Errorf("error connecting to Redis: %v", err)
}

return r.parseConnectedSlaves(s), nil
}

func (r *StandaloneRedisLock) parseConnectedSlaves(res string) int {
infos := strings.Split(res, infoReplicationDelimiter)
for _, info := range infos {
if strings.Contains(info, connectedSlavesReplicas) {
parsedReplicas, _ := strconv.ParseUint(info[len(connectedSlavesReplicas):], 10, 32)

return int(parsedReplicas)
}
// Ensure there are no replicas
// Pass the validation if error occurs, since some Redis versions such as miniredis do not recognize the `INFO` command.
replicas, err := rediscomponent.GetConnectedSlaves(ctx, r.client)
if err == nil && replicas > 0 {
return errors.New("replication is not supported")
}

return 0
return nil
}

// Try to acquire a redis lock.
// TryLock tries to acquire a lock.
// If the lock cannot be acquired, it returns immediately.
func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
// 1.Setting redis expiration time
// Set a key if doesn't exist with an expiration time
nxval, err := r.client.SetNX(ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
if nxval == nil {
return &lock.TryLockResponse{}, fmt.Errorf("[standaloneRedisLock]: SetNX returned nil.ResourceID: %s", req.ResourceID)
return &lock.TryLockResponse{}, fmt.Errorf("setNX returned a nil response")
}
// 2. check error

if err != nil {
return &lock.TryLockResponse{}, err
}
Expand All @@ -138,46 +97,46 @@ func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequ
}, nil
}

// Try to release a redis lock.
// Unlock tries to release a lock if the lock is still valid.
func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
// 1. delegate to client.eval lua script
// Delegate to client.eval lua script
evalInt, parseErr, err := r.client.EvalInt(ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
// 2. check error
if evalInt == nil {
return newInternalErrorUnlockResponse(), fmt.Errorf("[standaloneRedisLock]: Eval unlock script returned nil.ResourceID: %s", req.ResourceID)
res := &lock.UnlockResponse{
Status: lock.InternalError,
}
return res, errors.New("eval unlock script returned a nil response")
}
// 3. parse result
i := *evalInt
status := lock.InternalError

// Parse result
if parseErr != nil {
return &lock.UnlockResponse{
Status: status,
Status: lock.InternalError,
}, err
}
if i >= 0 {
var status lock.Status
switch {
case *evalInt >= 0:
status = lock.Success
} else if i == -1 {
case *evalInt == -1:
status = lock.LockDoesNotExist
} else if i == -2 {
case *evalInt == -2:
status = lock.LockBelongsToOthers
default:
status = lock.InternalError
}

return &lock.UnlockResponse{
Status: status,
}, nil
}

func newInternalErrorUnlockResponse() *lock.UnlockResponse {
return &lock.UnlockResponse{
Status: lock.InternalError,
}
}

// Close shuts down the client's redis connections.
func (r *StandaloneRedisLock) Close() error {
if r.client != nil {
closeErr := r.client.Close()
err := r.client.Close()
r.client = nil
return closeErr
return err
}
return nil
}
Expand Down
82 changes: 39 additions & 43 deletions lock/redis/standalone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ package redis

import (
"context"
"sync"
"testing"

miniredis "github.com/alicebob/miniredis/v2"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/components-contrib/lock"
"github.com/dapr/components-contrib/metadata"
Expand Down Expand Up @@ -84,9 +84,10 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
// 0. prepare
// start redis
s, err := miniredis.Run()
assert.NoError(t, err)
require.NoError(t, err)
defer s.Close()
// construct component

// Construct component
comp := NewStandaloneRedisLock(logger.NewLogger("test")).(*StandaloneRedisLock)
defer comp.Close()

Expand All @@ -95,59 +96,54 @@ func TestStandaloneRedisLock_TryLock(t *testing.T) {
}}
cfg.Properties["redisHost"] = s.Addr()
cfg.Properties["redisPassword"] = ""
// init

// Init
err = comp.InitLockStore(context.Background(), cfg)
assert.NoError(t, err)
require.NoError(t, err)

// 1. client1 trylock
ownerID1 := uuid.New().String()
resp, err := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: ownerID1,
ExpiryInSeconds: 10,
})
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, resp.Success)
var wg sync.WaitGroup
wg.Add(1)

// 2. Client2 tryLock fail
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
assert.NoError(t, err2)
assert.False(t, resp2.Success)
wg.Done()
}()
wg.Wait()
// 3. client 1 unlock
owner2 := uuid.New().String()
resp, err = comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
require.NoError(t, err)
assert.False(t, resp.Success)

// 3. Client 1 unlock
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: ownerID1,
})
assert.NoError(t, err)
require.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client1 failed to unlock!")
// 4. client 2 get lock
wg.Add(1)
go func() {
owner2 := uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
assert.NoError(t, err2)
assert.True(t, resp2.Success, "client2 failed to get lock?!")
// 5. client2 unlock
unlockResp, err := comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: owner2,
})
assert.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
wg.Done()
}()
wg.Wait()

// 4. Client 2 get lock
owner2 = uuid.New().String()
resp2, err2 := comp.TryLock(context.Background(), &lock.TryLockRequest{
ResourceID: resourceID,
LockOwner: owner2,
ExpiryInSeconds: 10,
})
require.NoError(t, err2)
assert.True(t, resp2.Success, "client2 failed to get lock?!")

// 5. client2 unlock
unlockResp, err = comp.Unlock(context.Background(), &lock.UnlockRequest{
ResourceID: resourceID,
LockOwner: owner2,
})
require.NoError(t, err)
assert.True(t, unlockResp.Status == 0, "client2 failed to unlock!")
}
Loading