Skip to content

Commit

Permalink
fix(redis): use lua scripts to ensure atomic manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
justlorain committed Sep 1, 2023
1 parent 5d40e80 commit 64beee9
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 10 deletions.
2 changes: 1 addition & 1 deletion redis/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
)

const (
defaultExpireTime = time.Second * 60
defaultExpireTime = 60
defaultTickerTime = time.Second * 30
defaultKeepAliveTime = time.Second * 60
defaultMonitorTime = time.Second * 30
Expand Down
52 changes: 45 additions & 7 deletions redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,20 @@ func (r *redisRegistry) Register(info *registry.Info) error {
}
r.mu.Lock()
r.rctx = &rctx
rdb.HSet(rctx.ctx, hash.key, hash.field, hash.value)
rdb.Expire(rctx.ctx, hash.key, defaultExpireTime)
rdb.Publish(rctx.ctx, hash.key, generateMsg(register, info.ServiceName, info.Addr.String()))
r.mu.Unlock()
keys := []string{
hash.key,
}
args := []interface{}{
hash.field,
hash.value,
defaultExpireTime,
generateMsg(register, info.ServiceName, info.Addr.String()),
}
err = registerScript.Run(rctx.ctx, rdb, keys, args).Err()
if err != nil && err != redis.Nil {
return err
}
go m.monitorTTL(rctx.ctx, hash, info, r)
go keepAlive(rctx.ctx, hash, r)
return nil
Expand All @@ -88,10 +98,38 @@ func (r *redisRegistry) Deregister(info *registry.Info) error {
if err != nil {
return err
}
r.mu.Lock()
rdb.HDel(rctx.ctx, hash.key, hash.field)
rdb.Publish(rctx.ctx, hash.key, generateMsg(deregister, info.ServiceName, info.Addr.String()))
keys := []string{
hash.key,
}
args := []interface{}{
hash.field,
generateMsg(deregister, info.ServiceName, info.Addr.String()),
}
err = deregisterScript.Run(rctx.ctx, rdb, keys, args).Err()
if err != nil && err != redis.Nil {
return err
}
rctx.cancel()
r.mu.Unlock()
return nil
}

var registerScript = redis.NewScript(`
local key = KEYS[1]
local field = ARGV[1]
local value = ARGV[2]
local expireTime = tonumber(ARGV[3])
local message = ARGV[4]
redis.call('HSET', key, field, value)
redis.call('EXPIRE', key, expireTime)
redis.call('PUBLISH', key, message)
`)

var deregisterScript = redis.NewScript(`
local key = KEYS[1]
local field = ARGV[1]
local message = ARGV[2]
redis.call('HDEL', key, field)
redis.call('PUBLISH', key, message)
`)
4 changes: 2 additions & 2 deletions redis/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ package redis

import (
"context"
"encoding/json"

"github.com/bytedance/sonic"
"github.com/cloudwego/hertz/pkg/app/client/discovery"
"github.com/cloudwego/hertz/pkg/common/hlog"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -51,7 +51,7 @@ func (r *redisResolver) Resolve(ctx context.Context, desc string) (discovery.Res
var its []discovery.Instance
for f, v := range fvs {
var ri registryInfo
err := json.Unmarshal([]byte(v), &ri)
err := sonic.Unmarshal([]byte(v), &ri)
if err != nil {
hlog.Warnf("HERTZ: fail to unmarshal with err: %v, ignore instance Addr: %v", err, f)
continue
Expand Down

0 comments on commit 64beee9

Please sign in to comment.