Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

detect binlog purged and report to Drainer #1017

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/pkg/util"
"github.com/pingcap/tidb-binlog/pump"
"github.com/pingcap/tidb-binlog/pump/storage"
"github.com/pingcap/tidb/store/tikv/oracle"
pb "github.com/pingcap/tipb/go-binlog"
"go.uber.org/zap"
Expand Down Expand Up @@ -101,9 +102,11 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
labelReceive := "receive binlog"
labelCreateConn := "create conn"
labelPaused := "pump paused"
labelBinlogGCed := "binlog purged"
pLog.Add(labelReceive, 10*time.Second)
pLog.Add(labelCreateConn, 10*time.Second)
pLog.Add(labelPaused, 30*time.Second)
pLog.Add(labelBinlogGCed, 30*time.Second)

ret := make(chan MergeItem, binlogChanSize)

Expand All @@ -119,6 +122,7 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
}()

needReCreateConn := false
isBinlogPurged := false
for {
if atomic.LoadInt32(&p.isClosed) == 1 {
return
Expand All @@ -134,6 +138,16 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
continue
}

if isBinlogPurged {
// some binlogs have been purged in pump, just print the log and wait to exit by user.
pLog.Print(labelBinlogGCed, func() {
p.logger.Error("some binlogs have been purged in pump")
3pointer marked this conversation as resolved.
Show resolved Hide resolved
})

time.Sleep(time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not exit directly? since isBinlogPurged never set back to false.

Copy link
Member Author

@csuzhangxc csuzhangxc Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if exit directly, then systemd will restart it again (but that's meaningless). if not exist, log and metrics can still keep working.

for isBinlogPurged, the user should handle it manually and re-establish the replication.

continue
}

if p.grpcConn == nil || needReCreateConn {
p.logger.Info("pump create pull binlogs client")
if err := p.createPullBinlogsClient(pctx, last); err != nil {
Expand All @@ -153,7 +167,13 @@ func (p *Pump) PullBinlog(pctx context.Context, last int64) chan MergeItem {
})
}

needReCreateConn = true
// Pump return binlog GCed error via gRPC response error.
if strings.Contains(err.Error(), storage.ErrRequestGCedBinlog.Error()) {
needReCreateConn = false // re-create connection will have no effect.
isBinlogPurged = true
} else {
needReCreateConn = true
}

time.Sleep(time.Second)
// TODO: add metric here
Expand Down
16 changes: 12 additions & 4 deletions pump/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,18 +289,24 @@ func (s *Server) PullBinlogs(in *binlog.PullBinlogReq, stream binlog.Pump_PullBi
last := in.StartFrom.Offset

gcTS := s.storage.GetGCTS()
if last <= gcTS {
if last != 0 && last <= gcTS {
// if requested with 0, then send binlog from oldest but not GCed TS.
// simple check TS before read binlog, but more checks are still needed in storage.
log.Error("drainer request a purged binlog TS, some binlog events may be loss", zap.Int64("gc TS", gcTS), zap.Reflect("request", in))
return errors.Annotatef(storage.ErrRequestGCedBinlog, "requested TS %d, GC TS %d", last, gcTS)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
binlogs := s.storage.PullCommitBinlog(ctx, last)
binlogs, errs := s.storage.PullCommitBinlog(ctx, last)

for {
select {
case <-s.pullClose:
return nil
case err2 := <-errs:
log.Error("pull binlog failed", zap.Error(err2))
return err2
case data, ok := <-binlogs:
if !ok {
return nil
Expand Down Expand Up @@ -636,12 +642,14 @@ func (s *Server) detectDrainerCheckPoints(ctx context.Context, gcTS int64) {
}

if drainer.MaxCommitTS < gcTS {
log.Error("drainer's checkpoint is older than pump gc ts, some binlogs are purged",
log.Error("drainer's checkpoint is older than pump alert gc ts, some binlogs may be purged after alert time",
zap.String("drainer", drainer.NodeID),
zap.Int64("gc ts", gcTS),
zap.Int64("alert gc ts", gcTS),
zap.Int64("drainer checkpoint", drainer.MaxCommitTS),
zap.Duration("alert time", earlyAlertGC),
)
// will add test when binlog have failpoint
// NOTE: this metrics do not mean the binlogs have been purge, but mean some binlogs will be purge after alert time.
detectedDrainerBinlogPurged.WithLabelValues(drainer.NodeID).Inc()
}
}
Expand Down
13 changes: 7 additions & 6 deletions pump/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,22 +136,23 @@ func (s *noOpStorage) GetGCTS() int64 { return 0 }
func (s *noOpStorage) GC(ts int64) {}
func (s *noOpStorage) MaxCommitTS() int64 { return 0 }
func (s *noOpStorage) GetBinlog(ts int64) (*binlog.Binlog, error) { return nil, nil }
func (s *noOpStorage) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
return make(chan []byte)
func (s *noOpStorage) PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) {
return make(chan []byte), make(chan error)
}
func (s *noOpStorage) Close() error { return nil }

type fakePullable struct{ noOpStorage }

func (s *fakePullable) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
func (s *fakePullable) PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) {
chl := make(chan []byte)
errs := make(chan error)
go func() {
for i := 0; i < 3; i++ {
chl <- []byte(fmt.Sprintf("payload_%d", i))
}
close(chl)
}()
return chl
return chl, errs
}

func (s *pullBinlogsSuite) TestPullBinlogFromStorage(c *C) {
Expand Down Expand Up @@ -664,8 +665,8 @@ func (s *startStorage) MaxCommitTS() int64 { return 0 }
func (s *startStorage) GetBinlog(ts int64) (*binlog.Binlog, error) {
return nil, errors.New("server_test")
}
func (s *startStorage) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
return make(chan []byte)
func (s *startStorage) PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) {
return make(chan []byte), make(<-chan error)
}
func (s *startStorage) Close() error {
<-s.sig
Expand Down
5 changes: 4 additions & 1 deletion pump/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func benchmarkPull(b *testing.B, prewriteValueSize int, binlogNum int) {
runtime.GC()
b.ResetTimer()

pulller := append.PullCommitBinlog(context.Background(), 0)
pulller, errs := append.PullCommitBinlog(context.Background(), 0)

cnt := 0
for b := range pulller {
Expand All @@ -101,6 +101,9 @@ func benchmarkPull(b *testing.B, prewriteValueSize int, binlogNum int) {
// just count the prewriteValueSize
b.SetBytes(int64(prewriteValueSize))
b.ReportAllocs()
if len(errs) > 0 {
b.Fatalf("pull binlog got some errors")
}
}

func benchmarkWrite(b *testing.B, prewriteValueSize int, parallelism int, sync bool) {
Expand Down
75 changes: 56 additions & 19 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ import (

const (
maxTxnTimeoutSecond int64 = 600
gcMaxBlockTime = 30 * time.Minute // we run GC at every 1 hour, but may block GC when reading and sending binlog at most in this duration.
chanCapacity = 1 << 20
// if pump takes a long time to write binlog, pump will display the binlog meta information (unit: Second)
slowWriteThreshold = 1.0
defaultStopWriteAtAvailableSpace = 10 * (1 << 30)
)

var (
// ErrRequestGCedBinlog indicates a Drainer is requesting some purged binlogs.
ErrRequestGCedBinlog = errors.New("request a purged binlog")

// save gcTS, the max TS we have gc, for binlog not greater than gcTS, we can delete it from storage
gcTSKey = []byte("!binlog!gcts")
// save maxCommitTS, we can get binlog in range [gcTS, maxCommitTS] from PullCommitBinlog
Expand Down Expand Up @@ -83,7 +87,7 @@ type Storage interface {
GetBinlog(ts int64) (binlog *pb.Binlog, err error)

// PullCommitBinlog return the chan to consume the binlog
PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error)

Close() error
}
Expand All @@ -104,7 +108,7 @@ type Append struct {
latestTS int64

gcWorking int32
gcTS int64
gcTS GCTS
maxCommitTS int64
headPointer valuePointer
handlePointer valuePointer
Expand Down Expand Up @@ -166,11 +170,12 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
sortItems: make(chan sortItem, 1024),
}

append.gcTS, err = append.readGCTSFromDB()
gcTS, err := append.readGCTSFromDB()
if err != nil {
return nil, errors.Trace(err)
}
gcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(append.gcTS))))
append.gcTS.Store(gcTS)
gcTSGauge.Set(float64(oracle.ExtractPhysical(uint64(gcTS))))

append.maxCommitTS, err = append.readInt64(maxCommitTSKey)
if err != nil {
Expand Down Expand Up @@ -214,7 +219,7 @@ func NewAppendWithResolver(dir string, options *Options, tiStore kv.Storage, tiL
minPointer = append.handlePointer
}

log.Info("Append info", zap.Int64("gcTS", append.gcTS),
log.Info("Append info", zap.Int64("gcTS", gcTS),
zap.Int64("maxCommitTS", append.maxCommitTS),
zap.Reflect("headPointer", append.headPointer),
zap.Reflect("handlePointer", append.handlePointer))
Expand Down Expand Up @@ -654,12 +659,12 @@ func (a *Append) Close() error {

// GetGCTS implement Storage.GetGCTS
func (a *Append) GetGCTS() int64 {
return atomic.LoadInt64(&a.gcTS)
return a.gcTS.Load()
}

// GC implement Storage.GC
func (a *Append) GC(ts int64) {
lastTS := atomic.LoadInt64(&a.gcTS)
lastTS := a.gcTS.Load()
if ts <= lastTS {
log.Info("ignore gc request", zap.Int64("ts", ts), zap.Int64("lastTS", lastTS))
return
Expand All @@ -673,7 +678,7 @@ func (a *Append) GC(ts int64) {
return
}

atomic.StoreInt64(&a.gcTS, ts)
a.gcTS.Store(ts) // once `Store` returned, no guarantee for metadata or vlog.
if err := a.saveGCTSToDB(ts); err != nil {
log.Error("Failed to save GCTS", zap.Int64("ts", ts), zap.Error(err))
}
Expand Down Expand Up @@ -1109,7 +1114,7 @@ func (a *Append) feedPreWriteValue(cbinlog *pb.Binlog) error {
}

// PullCommitBinlog return commit binlog > last
func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte {
func (a *Append) PullCommitBinlog(ctx context.Context, last int64) (<-chan []byte, <-chan error) {
log.Debug("new PullCommitBinlog", zap.Int64("last ts", last))

ctx, cancel := context.WithCancel(ctx)
Expand All @@ -1121,14 +1126,21 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}
}()

gcTS := atomic.LoadInt64(&a.gcTS)
values := make(chan []byte, 5)
errs := make(chan error, 5) // we `return` after sending an error now, so it should never block on this chan.

gcTS := a.gcTS.Load()
if last < gcTS {
log.Warn("last ts less than gcTS", zap.Int64("last ts", last), zap.Int64("gcTS", gcTS))
last = gcTS
if last == 0 {
log.Warn("last TS is 0, will send binlog from gcTS", zap.Int64("gcTS", gcTS))
last = gcTS
} else {
log.Error("last TS less than gcTS, some binlog events may be loss", zap.Int64("lastTS", last), zap.Int64("gcTS", gcTS))
errs <- errors.Annotatef(ErrRequestGCedBinlog, "requested TS %d, GC TS %d", last, gcTS)
return values, errs
}
}

values := make(chan []byte, 5)

irange := &util.Range{
Start: encodeTSKey(0),
Limit: encodeTSKey(math.MaxInt64),
Expand All @@ -1153,12 +1165,23 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
continue
}

// acquire the lock to block GC.
// NOTE: do not forget to release the lock carefully.
gcTS = a.gcTS.LoadAndLock()
if last < gcTS {
a.gcTS.ReleaseLoadLock()
log.Error("last TS less than gcTS, some binlog events may be loss", zap.Int64("lastTS", last), zap.Int64("gcTS", gcTS))
errs <- errors.Annotatef(ErrRequestGCedBinlog, "requested TS %d, GC TS %d", last, gcTS)
return
}

irange.Start = encodeTSKey(startTS)
irange.Limit = encodeTSKey(limitTS)
iter := a.metadata.NewIterator(irange, nil)

// log.Debugf("try to get range [%d,%d)", startTS, atomic.LoadInt64(&a.maxCommitTS)+1)

readForLoop:
for ok := iter.Seek(encodeTSKey(startTS)); ok; ok = iter.Next() {
var vp valuePointer
err := vp.UnmarshalBinary(iter.Value())
Expand All @@ -1171,17 +1194,21 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte

value, err := a.vlog.readValue(vp)
if err != nil {
log.Error("read value failed", zap.Error(err))
iter.Release()
errorCount.WithLabelValues("read_value").Add(1.0)
a.gcTS.ReleaseLoadLock()
log.Error("read value failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err))
errs <- errors.Errorf("read value failed, TS %d", decodeTSKey(iter.Key()))
return
}

binlog := new(pb.Binlog)
err = binlog.Unmarshal(value)
if err != nil {
log.Error("Unmarshal Binlog failed", zap.Error(err))
iter.Release()
a.gcTS.ReleaseLoadLock()
log.Error("Unmarshal Binlog failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err))
errs <- errors.Errorf("Unmarshal Binlog failed, TS %d", decodeTSKey(iter.Key()))
return
}

Expand All @@ -1205,30 +1232,40 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}

errorCount.WithLabelValues("feed_pre_write_value").Add(1.0)
log.Error("feed pre write value failed", zap.Error(err))
iter.Release()
a.gcTS.ReleaseLoadLock()
log.Error("feed pre write value failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err))
errs <- errors.Errorf("feed pre write value failed, TS %d", decodeTSKey(iter.Key()))
return
}
}

value, err = binlog.Marshal()
if err != nil {
log.Error("marshal failed", zap.Error(err))
iter.Release()
a.gcTS.ReleaseLoadLock()
log.Error("marshal failed", zap.Int64("TS", decodeTSKey(iter.Key())), zap.Error(err))
errs <- errors.Errorf("marshal failed, TS %d", decodeTSKey(iter.Key()))
return
}

select {
case values <- value:
log.Debug("send value success")
case <-time.After(gcMaxBlockTime):
// do not update `last` anymore.
log.Warn("can not send the binlog for a long time, will try to read again", zap.Duration("duration", gcMaxBlockTime), zap.Int64("current TS", decodeTSKey(iter.Key())))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we send error to errs here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send the binlog for a long time often caused by a slow drainer. if we send errors, the replication will be broken and try to send again, that's should make it worse.

Copy link
Contributor

@3pointer 3pointer Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so, if drainer encounter the purge error , all pump will reach this warning 🤔️, but both drainer and pump still running. until the user manually fix it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? not all pump, but only this one. and pump should still be running because may other drainers can still work normally.

Copy link
Contributor

@3pointer 3pointer Dec 24, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if one pump is blocked, it seems the merge source will blocked soon. and the drainer will block soon. so I think exit immeditately here is better.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I know what you mean. any advice?

break readForLoop
case <-ctx.Done():
iter.Release()
a.gcTS.ReleaseLoadLock()
return
}

last = decodeTSKey(iter.Key())
}
iter.Release()
a.gcTS.ReleaseLoadLock()
err := iter.Error()
if err != nil {
log.Error("encounter iterator error", zap.Error(err))
Expand All @@ -1243,7 +1280,7 @@ func (a *Append) PullCommitBinlog(ctx context.Context, last int64) <-chan []byte
}
}()

return values
return values, errs
}

type storageSize struct {
Expand Down
Loading