Skip to content

Commit

Permalink
merge remote and add windows platform support
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed Sep 4, 2019
2 parents 30447be + 164cd78 commit 6540679
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 98 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ result.db.*
bin/*
conf/*
!conf/redis-shake.conf
!.circleci/config.yml

dump.data
runtime.trace
Expand Down
11 changes: 11 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
2019-09-04 Alibaba Cloud.
* VERSION: 1.6.18
* BUGFIX: restore quicklist panic when target is cluster. see #156
2019-08-27 Alibaba Cloud.
* VERSION: 1.6.17
* BUGFIX: transaction syncing panic when target redis is cluster. see
#145.
* IMPROVE: adjust RecvChanSize based on `sender.count` or `scan.key_number`
if target redis type is cluster.
* IMPROVE: remove some variables in conf like `heartbeat`, `ncpu`.
* IMPROVE: print inner error message from redigo driver return message.
2019-08-09 Alibaba Cloud.
* VERSION: 1.6.16
* BUGFIX: big key in `rump` mode all expired.
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The type can be one of the followings:<br>

* **decode**: Decode dumped payload to human readable format (hex-encoding).
* **restore**: Restore RDB file to target redis.
* **dump**: Dump RDB file from souce redis.
* **dump**: Dump RDB file from source redis.
* **sync**: Sync data from source redis to target redis by `sync` or `psync` command. Including full synchronization and incremental synchronization.
* **rump**: Sync data from source redis to target redis by `scan` command. Only support full synchronization. Plus, RedisShake also supports fetching data from given keys in the input file when `scan` command is not supported on the source side. This mode is usually used when `sync` and `psync` redis commands aren't supported.

Expand Down Expand Up @@ -99,3 +99,5 @@ Plus, we have a WeChat group so that users can join and discuss, but the group u
| :------: | :------: |
| ceshihao | [email protected] |
| wangyiyang | [email protected] |
| muicoder | [email protected] |
| zhklcf | [email protected] |
34 changes: 6 additions & 28 deletions conf/redis-shake.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ system_profile = 9310
# restful port,查看metric端口, -1表示不启用,如果是`restore`模式,只有设置为-1才会在完成RDB恢复后退出,否则会一直block。
http_profile = 9320

# runtime.GOMAXPROCS, 0 means use cpu core number: runtime.NumCPU()
ncpu = 0

# parallel routines number used in RDB file syncing. default is 64.
# 启动多少个并发线程同步一个RDB文件。
parallel = 32
Expand All @@ -41,7 +38,8 @@ source.type = standalone
# 2. ${sentinel_master_name}:${master or slave}@sentinel single/cluster address, e.g., mymaster:[email protected]:26379;127.0.0.1:26380, or @127.0.0.1:26379;127.0.0.1:26380. for "sentinel" type.
# 3. cluster that has several db nodes split by semicolon(;). for "cluster" type. e.g., 10.1.1.1:20331;10.1.1.2:20441.
# 4. proxy address(used in "rump" mode only). for "proxy" type.
# 源redis地址。对于sentinel模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址"
# 源redis地址。对于sentinel或者开源cluster模式,输入格式为"master名字:拉取角色为master或者slave@sentinel的地址",别的cluster
# 架构,比如codis, twemproxy, aliyun proxy等需要配置所有master或者slave的db地址。
source.address = 127.0.0.1:20441
# password of db/proxy. even if type is sentinel.
source.password_raw = 123456
Expand Down Expand Up @@ -165,31 +163,16 @@ metric = true
# 是否将metric打印到log中
metric.print_log = false

# heartbeat
# send heartbeat to this url
# used in `sync`.
# 心跳的url地址,redis-shake将会发送到这个地址
#heartbeat.url = http://127.0.0.1:8000
heartbeat.url =
# interval by seconds
# 心跳保活周期
heartbeat.interval = 3
# external info which will be included in heartbeat data.
# 在心跳报文中添加额外的信息
heartbeat.external = test external
# local network card to get ip address, e.g., "lo", "eth0", "en0"
# 获取ip的网卡
heartbeat.network_interface =

# sender information.
# sender flush buffer size of byte.
# used in `sync`.
# 发送缓存的字节长度,超过这个阈值将会强行刷缓存发送
sender.size = 104857600
# sender flush buffer size of oplog number.
# used in `sync`.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送
sender.count = 5000
# used in `sync`. flush sender buffer when bigger than this threshold.
# 发送缓存的报文个数,超过这个阈值将会强行刷缓存发送,对于目的端是cluster的情况,这个值
# 的调大将会占用部分内存。
sender.count = 4096
# delay channel size. once one oplog is sent to target redis, the oplog id and timestamp will also
# stored in this delay queue. this timestamp will be used to calculate the time delay when receiving
# ack from target redis.
Expand All @@ -207,13 +190,11 @@ keep_alive = 0
# number of keys captured each time. default is 100.
# 每次scan的个数,不配置则默认100.
scan.key_number = 50

# used in `rump`.
# we support some special redis types that don't use default `scan` command like alibaba cloud and tencent cloud.
# 有些版本具有特殊的格式,与普通的scan命令有所不同,我们进行了特殊的适配。目前支持腾讯云的集群版"tencent_cluster"
# 和阿里云的集群版"aliyun_cluster"。
scan.special_cloud =

# used in `rump`.
# we support to fetching data from given file which marks the key list.
# 有些云版本,既不支持sync/psync,也不支持scan,我们支持从文件中进行读取所有key列表并进行抓取:一行一个key。
Expand All @@ -229,6 +210,3 @@ qps = 200000
# replace hash tag.
# used in `sync`.
replace_hash_tag = false

# used in `restore` and `dump`.
extra = false
6 changes: 3 additions & 3 deletions src/pkg/rdb/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,9 +196,9 @@ func (l *Loader) NextBinEntry() (*BinEntry, error) {
} else {
key = l.lastEntry.Key
}
// log.Infof("l %p r %p", l, l.rdbReader)
// log.Info("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t)
// log.Info("r.remainMember:", l.rdbReader.remainMember)
//log.Debugf("l %p r %p", l, l.rdbReader)
//log.Debug("remainMember:", l.remainMember, " key:", string(key[:]), " type:", t)
//log.Debug("r.remainMember:", l.rdbReader.remainMember)
val, err := l.readObjectValue(t, l)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion src/pkg/rdb/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"strconv"

"pkg/libs/errors"
// "libs/log"

)

var FromVersion int64 = 9
Expand Down Expand Up @@ -144,10 +144,12 @@ func (r *rdbReader) readObjectValue(t byte, l *Loader) ([]byte, error) {
if n, err := r.ReadLength(); err != nil {
return nil, err
} else {
// log.Debug("zset length: ", n)
for i := 0; i < int(n); i++ {
if _, err := r.ReadString(); err != nil {
return nil, err
}
// log.Debug("zset read: ", i)
if t == RdbTypeZSet2 {
if _, err := r.ReadDouble(); err != nil {
return nil, err
Expand Down
7 changes: 6 additions & 1 deletion src/redis-shake/common/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"pkg/libs/log"
)

const(
var (
RecvChanSize = 4096
)

Expand Down Expand Up @@ -61,6 +61,11 @@ func (cc *ClusterConn) Send(commandName string, args ...interface{}) error {

// send batcher and put the return into recvChan
func (cc *ClusterConn) Flush() error {
if cc.batcher == nil {
log.Info("batcher is empty, no need to flush")
return nil
}

ret, err := cc.client.RunBatch(cc.batcher)
defer func() {
cc.batcher = nil // reset batcher
Expand Down
5 changes: 5 additions & 0 deletions src/redis-shake/common/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (
"strings"
)

const (
ReplayString = "string"
ReplayInt64s = "int64s"
)

type ClusterNodeInfo struct {
Id string
Address string
Expand Down
2 changes: 2 additions & 0 deletions src/redis-shake/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,8 @@ func RestoreRdbEntry(c redigo.Conn, e *rdb.BinEntry) {
params = append(params, "FREQ")
params = append(params, e.Freq)
}

log.Debugf("restore key[%s] with params[%v]", e.Key, params)
// fmt.Printf("key: %v, value: %v params: %v\n", string(e.Key), e.Value, params)
// s, err := redigo.String(c.Do("restore", params...))
s, err := redigoCluster.String(c.Do("restore", params...))
Expand Down
108 changes: 54 additions & 54 deletions src/redis-shake/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,65 +4,65 @@ import "time"

type Configuration struct {
// config file variables
Id string `config:"id"`
LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
Id string `config:"id"`
LogFile string `config:"log.file"`
LogLevel string `config:"log.level"`
SystemProfile int `config:"system_profile"`
HttpProfile int `config:"http_profile"`
Parallel int `config:"parallel"`
SourceType string `config:"source.type"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceAuthType string `config:"source.auth_type"`
SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"`
SourceRdbParallel int `config:"source.rdb.parallel"`
SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"`
TargetRdbOutput string `config:"target.rdb.output"`
TargetVersion string `config:"target.version"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"`
FilterDBBlacklist []string `config:"filter.db.blacklist"`
FilterKeyWhitelist []string `config:"filter.key.whitelist"`
FilterKeyBlacklist []string `config:"filter.key.blacklist"`
FilterSlot []string `config:"filter.slot"`
FilterLua bool `config:"filter.lua"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`

/*---------------------------------------------------------*/
// inner variables
NCpu int `config:"ncpu"`
Parallel int `config:"parallel"`
SourceType string `config:"source.type"`
SourceAddress string `config:"source.address"`
SourcePasswordRaw string `config:"source.password_raw"`
SourcePasswordEncoding string `config:"source.password_encoding"`
SourceAuthType string `config:"source.auth_type"`
SourceTLSEnable bool `config:"source.tls_enable"`
SourceRdbInput []string `config:"source.rdb.input"`
SourceRdbParallel int `config:"source.rdb.parallel"`
SourceRdbSpecialCloud string `config:"source.rdb.special_cloud"`
TargetAddress string `config:"target.address"`
TargetPasswordRaw string `config:"target.password_raw"`
TargetPasswordEncoding string `config:"target.password_encoding"`
TargetDBString string `config:"target.db"`
TargetAuthType string `config:"target.auth_type"`
TargetType string `config:"target.type"`
TargetTLSEnable bool `config:"target.tls_enable"`
TargetRdbOutput string `config:"target.rdb.output"`
TargetVersion string `config:"target.version"`
FakeTime string `config:"fake_time"`
Rewrite bool `config:"rewrite"`
FilterDBWhitelist []string `config:"filter.db.whitelist"`
FilterDBBlacklist []string `config:"filter.db.blacklist"`
FilterKeyWhitelist []string `config:"filter.key.whitelist"`
FilterKeyBlacklist []string `config:"filter.key.blacklist"`
FilterSlot []string `config:"filter.slot"`
FilterLua bool `config:"filter.lua"`
BigKeyThreshold uint64 `config:"big_key_threshold"`
Psync bool `config:"psync"`
Metric bool `config:"metric"`
MetricPrintLog bool `config:"metric.print_log"`
HeartbeatUrl string `config:"heartbeat.url"`
HeartbeatInterval uint `config:"heartbeat.interval"`
HeartbeatExternal string `config:"heartbeat.external"`
HeartbeatNetworkInterface string `config:"heartbeat.network_interface"`
SenderSize uint64 `config:"sender.size"`
SenderCount uint `config:"sender.count"`
SenderDelayChannelSize uint `config:"sender.delay_channel_size"`
KeepAlive uint `config:"keep_alive"`
PidPath string `config:"pid_path"`
ScanKeyNumber uint32 `config:"scan.key_number"`
ScanSpecialCloud string `config:"scan.special_cloud"`
ScanKeyFile string `config:"scan.key_file"`
Qps int `config:"qps"`

/*---------------------------------------------------------*/
// inner variables
ReplaceHashTag bool `config:"replace_hash_tag"`
ExtraInfo bool `config:"extra"`
SockFileName string `config:"sock.file_name"`
SockFileSize uint `config:"sock.file_size"`
FilterKey []string `config:"filter.key"` // compatible with older versions
FilterDB string `config:"filter.db"` // compatible with older versions
ReplaceHashTag bool `config:"replace_hash_tag"`
ExtraInfo bool `config:"extra"`
SockFileName string `config:"sock.file_name"`
SockFileSize uint `config:"sock.file_size"`
FilterKey []string `config:"filter.key"` // compatible with older versions
FilterDB string `config:"filter.db"` // compatible with older versions

/*---------------------------------------------------------*/
// generated variables
Expand Down
9 changes: 7 additions & 2 deletions src/redis-shake/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,10 @@ func sanitizeOptions(tp string) error {
// set to default when not set
conf.Options.SenderCount = defaultSenderCount
}
if conf.Options.TargetType == conf.RedisTypeCluster && int(conf.Options.SenderCount) > utils.RecvChanSize {
log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.SenderCount))
utils.RecvChanSize = int(conf.Options.SenderCount)
}

if conf.Options.SenderDelayChannelSize == 0 {
conf.Options.SenderDelayChannelSize = 32
Expand Down Expand Up @@ -449,8 +453,9 @@ func sanitizeOptions(tp string) error {
conf.Options.ScanSpecialCloud, conf.Options.ScanKeyFile)
}

if conf.Options.ScanKeyNumber > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
return fmt.Errorf("scan.key_number should less than [%v] when target type is cluster", utils.RecvChanSize)
if int(conf.Options.ScanKeyNumber) > utils.RecvChanSize && conf.Options.TargetType == conf.RedisTypeCluster {
log.Infof("RecvChanSize is modified from [%v] to [%v]", utils.RecvChanSize, int(conf.Options.ScanKeyNumber))
utils.RecvChanSize = int(conf.Options.ScanKeyNumber)
}

//if len(conf.Options.SourceAddressList) == 1 {
Expand Down
13 changes: 8 additions & 5 deletions src/redis-shake/rump.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
"redis-shake/configure"
"redis-shake/metric"
"redis-shake/scanner"
"redis-shake/filter"

"github.com/garyburd/redigo/redis"
"redis-shake/filter"
)

type CmdRump struct {
Expand Down Expand Up @@ -483,18 +483,21 @@ func (dre *dbRumperExecutor) doFetch(db int) error {
log.Debugf("dbRumper[%v] executor[%v] scan key: %v", dre.rumperId, dre.executorId, key)
dre.sourceClient.Send("DUMP", key)
}
dumps, err := redis.Strings(dre.sourceClient.Do(""))

reply, err := dre.sourceClient.Do("")
dumps, err := redis.Strings(reply, err)
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do dump with failed[%v]", err)
return fmt.Errorf("do dump with failed[%v], reply[%v]", err, reply)
}

// pipeline ttl
for _, key := range keys {
dre.sourceClient.Send("PTTL", key)
}
pttls, err := redis.Int64s(dre.sourceClient.Do(""))
reply, err = dre.sourceClient.Do("")
pttls, err := redis.Int64s(reply, err)
if err != nil && err != redis.ErrNil {
return fmt.Errorf("do ttl with failed[%v]", err)
return fmt.Errorf("do ttl with failed[%v], reply[%v]", err, reply)
}

dre.stat.rCommands.Add(int64(len(keys)))
Expand Down
6 changes: 3 additions & 3 deletions src/vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@
"revisionTime": "2019-03-04T09:57:49Z"
},
{
"checksumSHA1": "1PrwQi6VvhLG4ovw1B7zD/afGZ4=",
"checksumSHA1": "qW5Sb6hcZxGHxZbkLoHrCQtYyBs=",
"path": "github.com/vinllen/redis-go-cluster",
"revision": "0799101ddfdb7d2bc9d47948f8a0cc17d23a216e",
"revisionTime": "2019-08-12T11:22:58Z"
"revision": "1883b18765aeed1a0b01320791dfe6103aa9a6b6",
"revisionTime": "2019-09-04T07:01:13Z"
},
{
"checksumSHA1": "U4rR1I0MXcvJz3zSxTp3hb3Y0I0=",
Expand Down

0 comments on commit 6540679

Please sign in to comment.