Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: improve visualization of br #56612

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ type Progress interface {
// called.
Close()
}

// WithProgress execute some logic with the progress, and close it once the execution done.
func WithProgress(
ctx context.Context,
g Glue,
cmdName string,
total int64,
redirectLog bool,
cc func(p Progress) error,
) error {
p := g.StartProgress(ctx, cmdName, total, redirectLog)
defer p.Close()
return cc(p)
}
25 changes: 22 additions & 3 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ const (
strictPlacementPolicyMode = "STRICT"
ignorePlacementPolicyMode = "IGNORE"

defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
resetSpeedLimitRetryTimes = 3
)

const minBatchDdlSize = 1
Expand Down Expand Up @@ -948,7 +949,25 @@ func (rc *SnapClient) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error
return nil
}

func (rc *SnapClient) ResetSpeedLimit(ctx context.Context) error {
func (rc *SnapClient) resetSpeedLimit(ctx context.Context) {
var resetErr error
// In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO
for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ {
resetErr = rc.resetSpeedLimitInternal(ctx)
if resetErr != nil {
log.Warn("failed to reset speed limit, retry it",
zap.Int("retry time", retry), logutil.ShortError(resetErr))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
break
}
if resetErr != nil {
log.Error("failed to reset speed limit, please reset it manually", zap.Error(resetErr))
}
}

func (rc *SnapClient) resetSpeedLimitInternal(ctx context.Context) error {
rc.hasSpeedLimited = false
err := rc.setSpeedLimit(ctx, 0)
if err != nil {
Expand Down
122 changes: 122 additions & 0 deletions br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/engine"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -106,6 +107,125 @@ func defaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

type PipelineContext struct {
// pipeline item switch
Checksum bool
LoadStats bool
WaitTiflashReady bool

// pipeline item configuration
LogProgress bool
ChecksumConcurrency uint
StatsConcurrency uint

// pipeline item tool client
KvClient kv.Client
ExtStorage storage.ExternalStorage
Glue glue.Glue
}

// RestorePipeline do some work in pipeline, such as checkum, load stats and wait tiflash ready.
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)
progressLen := int64(0)
if plCtx.Checksum {
progressLen += int64(len(createdTables))
}
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
if plCtx.WaitTiflashReady {
progressLen += int64(len(createdTables))
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
defer updateCh.Close()
// pipeline checksum
if plCtx.Checksum {
postHandleCh = rc.GoValidateChecksum(
ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
}

// pipeline update meta and load stats
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)

// pipeline wait Tiflash synced
if plCtx.WaitTiflashReady {
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
}

finish := dropToBlackhole(ctx, postHandleCh, errCh)

select {
case err = <-errCh:
err = multierr.Append(err, multierr.Combine(Exhaust(errCh)...))
case <-finish:
}

return errors.Trace(err)
}

func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
outCh := make(chan *CreatedTable)

go func() {
defer close(outCh)

for _, createdTable := range createdTables {
select {
case <-ctx.Done():
return
default:
outCh <- createdTable
}
}
}()
return outCh
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
ctx context.Context,
inCh <-chan *CreatedTable,
errCh chan<- error,
) <-chan struct{} {
outCh := make(chan struct{}, 1)
go func() {
defer func() {
close(outCh)
}()
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case _, ok := <-inCh:
if !ok {
return
}
}
}
}()
return outCh
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
Expand Down Expand Up @@ -183,6 +303,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
s storage.ExternalStorage,
inCh <-chan *CreatedTable,
errCh chan<- error,
updateCh glue.Progress,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
Expand Down Expand Up @@ -233,6 +354,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
updateCh.Inc()
return nil
}, func() {
log.Info("all stats updated")
Expand Down
Loading