Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support increment sync #744

Open
wants to merge 1 commit into
base: v4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"net"
"strconv"
"strings"
"time"

"RedisShake/internal/client/proto"
Expand All @@ -17,6 +18,8 @@ type Redis struct {
writer *bufio.Writer
protoReader *proto.Reader
protoWriter *proto.Writer

ReplId string
}

func NewRedisClient(address string, username string, password string, Tls bool) *Redis {
Expand Down Expand Up @@ -53,6 +56,21 @@ func NewRedisClient(address string, username string, password string, Tls bool)
}
}

// replId
r.ReplId = func() string {
reply := r.DoWithStringReply("info", "replication")
replyIdField := "master_replid"
idx1 := strings.Index(reply, replyIdField)
if idx1 < 0 {
log.Panicf("can not found replid with reply: %s", reply)
}
idx2 := strings.IndexByte(reply[idx1:], '\r')
if idx2 < 0 {
log.Panicf("can not found replid with reply: %s", reply)
}
return reply[idx1 + len(replyIdField) + 1 : idx1 + idx2]
}()

// ping to test connection
reply := r.DoWithStringReply("ping")
if reply != "PONG" {
Expand Down
85 changes: 70 additions & 15 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package reader

import (
"context"
"bufio"
"context"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -82,11 +83,11 @@ func NewSyncStandaloneReader(opts *SyncReaderOptions) Reader {
r.opts = opts
r.client = client.NewRedisClient(opts.Address, opts.Username, opts.Password, opts.Tls)
r.rd = r.client.BufioReader()
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) + "_" + r.client.ReplId
r.stat.Address = opts.Address
r.stat.Status = kHandShake
r.stat.Dir = utils.GetAbsPath(r.stat.Name)
utils.CreateEmptyDir(r.stat.Dir)
r.stat.AofReceivedOffset = readLastReplOffset(r.stat.Dir)
return r
}

Expand All @@ -95,20 +96,26 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
r.ch = make(chan *entry.Entry, 1024)
go func() {
r.sendReplconfListenPort()
r.sendPSync()
fullReSync := r.sendPSync()
go r.sendReplconfAck() // start sent replconf ack
rdbFilePath := r.receiveRDB()
startOffset := r.stat.AofReceivedOffset
go r.receiveAOF(r.rd)
if r.opts.SyncRdb {
if fullReSync {
// empty out of date file before full sync
utils.CreateEmptyDir(r.stat.Dir)
rdbFilePath := r.receiveRDB()
r.sendRDB(rdbFilePath)
}

// create aof file first
aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset)
go r.receiveAOF(r.rd, aofWriter)
if r.opts.SyncAof {
r.stat.Status = kSyncAof
r.sendAOF(startOffset)
r.sendAOF(r.stat.AofReceivedOffset)
}
close(r.ch)
r.client.Close()
aofWriter.Close()
// must be closed last so that other resources can be released
close(r.ch)
}()

return r.ch
Expand All @@ -124,9 +131,15 @@ func (r *syncStandaloneReader) sendReplconfListenPort() {
}
}

func (r *syncStandaloneReader) sendPSync() {
// the return indicate whether full sync
func (r *syncStandaloneReader) sendPSync() bool {
// send PSync
argv := []string{"PSYNC", "?", "-1"}
var argv []string
if r.opts.SyncRdb || r.stat.AofReceivedOffset <= 0 {
argv = []string{"PSYNC", "?", "-1"}
} else {
argv = []string{"PSYNC", r.client.ReplId, strconv.FormatInt(r.stat.AofReceivedOffset, 10)}
}
if config.Opt.Advanced.AwsPSync != "" {
argv = []string{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"}
}
Expand All @@ -142,12 +155,27 @@ func (r *syncStandaloneReader) sendPSync() {
break
}
}

reply := r.client.ReceiveString()
if reply == "CONTINUE" {
log.Infof("increment sync start at last offset: %d", r.stat.AofReceivedOffset)
b, err := r.rd.ReadByte()
if err != nil {
log.Panicf(err.Error())
}
if b != '\n' {
log.Panicf("unexpected data:%s", string(b))
}
return false
}

// FULLRESYNC <replID> <offset>
masterOffset, err := strconv.Atoi(strings.Split(reply, " ")[2])
if err != nil {
log.Panicf(err.Error())
}
r.stat.AofReceivedOffset = int64(masterOffset)
return true
}

func (r *syncStandaloneReader) receiveRDB() string {
Expand Down Expand Up @@ -225,10 +253,8 @@ func (r *syncStandaloneReader) receiveRDB() string {
return rdbFilePath
}

func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
func (r *syncStandaloneReader) receiveAOF(rd io.Reader, aofWriter *rotate.AOFWriter) {
log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name)
aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset)
defer aofWriter.Close()
buf := make([]byte, 16*1024) // 16KB is enough for writing file
for {
select {
Expand Down Expand Up @@ -344,3 +370,32 @@ func (r *syncStandaloneReader) StatusConsistent() bool {
r.stat.AofReceivedOffset == r.stat.AofSentOffset &&
len(r.ch) == 0
}

func readLastReplOffset(dir string) int64 {
var offset int64 = 0
if !utils.IsExist(dir) {
return 0
}
if err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
ext := filepath.Ext(path)
if !info.IsDir() && (ext == ".aof") {
baseOffset, err := strconv.ParseInt(strings.TrimSuffix(info.Name(), ext), 10, 64)
if err != nil {
log.Warnf("illegal file name of aof: %s", info.Name())
return nil
}
if baseOffset + info.Size() > offset {
offset = baseOffset + info.Size()
}
}
return nil
}); err != nil {
log.Warnf("parse repl offset from aof file err: %s", err.Error())
return 0
}
log.Infof("read repl offset:%d for increment sync", offset)
return offset
}
12 changes: 6 additions & 6 deletions internal/utils/file_rotate/aof_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,39 @@ func NewAOFReader(name string, dir string, offset int64) *AOFReader {
r := new(AOFReader)
r.name = name
r.dir = dir
r.openFile(offset)
r.offset = offset
r.openFile()
return r
}

func (r *AOFReader) openFile(offset int64) {
func (r *AOFReader) openFile() {
r.filepath = fmt.Sprintf("%s/%d.aof", r.dir, r.offset)
var err error
r.file, err = os.OpenFile(r.filepath, os.O_RDONLY, 0644)
if err != nil {
log.Panicf(err.Error())
}
r.offset = offset
r.pos = 0
log.Debugf("[%s] open file for read. filename=[%s]", r.name, r.filepath)
}

func (r *AOFReader) readNextFile(offset int64) {
func (r *AOFReader) readNextFile() {
filepath := fmt.Sprintf("%s/%d.aof", r.dir, r.offset)
if utils.IsExist(filepath) {
r.Close()
err := os.Remove(r.filepath)
if err != nil {
return
}
r.openFile(offset)
r.openFile()
}
}

func (r *AOFReader) Read(buf []byte) (n int, err error) {
n, err = r.file.Read(buf)
for err == io.EOF {
if r.filepath != fmt.Sprintf("%s/%d.aof", r.dir, r.offset) {
r.readNextFile(r.offset)
r.readNextFile()
}
time.Sleep(time.Millisecond * 10)
_, err = r.file.Seek(0, 1)
Expand Down
8 changes: 4 additions & 4 deletions internal/utils/file_rotate/aof_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ func NewAOFWriter(name string, dir string, offset int64) *AOFWriter {
w := new(AOFWriter)
w.name = name
w.dir = dir
w.openFile(offset)
w.offset = offset
w.openFile()
return w
}

func (w *AOFWriter) openFile(offset int64) {
func (w *AOFWriter) openFile() {
w.filepath = fmt.Sprintf("%s/%d.aof", w.dir, w.offset)
var err error
w.file, err = os.OpenFile(w.filepath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
log.Panicf(err.Error())
}
w.offset = offset
w.filesize = 0
log.Debugf("[%s] open file for write. filename=[%s]", w.name, w.filepath)
}
Expand All @@ -48,7 +48,7 @@ func (w *AOFWriter) Write(buf []byte) {
w.filesize += int64(len(buf))
if w.filesize > MaxFileSize {
w.Close()
w.openFile(w.offset)
w.openFile()
}
err = w.file.Sync()
if err != nil {
Expand Down
Loading