Skip to content

Commit

Permalink
improve visualization of br
Browse files Browse the repository at this point in the history
Signed-off-by: Jianjun Liao <[email protected]>
  • Loading branch information
Leavrth committed Oct 12, 2024
1 parent 9b1fa1c commit 48329ac
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 186 deletions.
7 changes: 7 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,10 @@ type Progress interface {
// called.
Close()
}

// WithProgress execute some logic with the progress, and close it once the execution done.
func WithProgress(ctx context.Context, g Glue, cmdName string, total int64, redirectLog bool, cc func(p Progress) error) error {
p := g.StartProgress(ctx, cmdName, total, redirectLog)
defer p.Close()
return cc(p)
}
25 changes: 22 additions & 3 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ const (
strictPlacementPolicyMode = "STRICT"
ignorePlacementPolicyMode = "IGNORE"

defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
defaultDDLConcurrency = 16
maxSplitKeysOnce = 10240
resetSpeedLimitRetryTimes = 3
)

const minBatchDdlSize = 1
Expand Down Expand Up @@ -948,7 +949,25 @@ func (rc *SnapClient) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error
return nil
}

func (rc *SnapClient) ResetSpeedLimit(ctx context.Context) error {
func (rc *SnapClient) resetSpeedLimit(ctx context.Context) {
var resetErr error
// In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO
for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ {
resetErr = rc.resetSpeedLimitInternal(ctx)
if resetErr != nil {
log.Warn("failed to reset speed limit, retry it",
zap.Int("retry time", retry), logutil.ShortError(resetErr))
time.Sleep(time.Duration(retry+3) * time.Second)
continue
}
break
}
if resetErr != nil {
log.Error("failed to reset speed limit, please reset it manually", zap.Error(resetErr))
}
}

func (rc *SnapClient) resetSpeedLimitInternal(ctx context.Context) error {
rc.hasSpeedLimited = false
err := rc.setSpeedLimit(ctx, 0)
if err != nil {
Expand Down
122 changes: 122 additions & 0 deletions br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/engine"
pdhttp "github.com/tikv/pd/client/http"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -106,6 +107,125 @@ func defaultOutputTableChan() chan *CreatedTable {
return make(chan *CreatedTable, defaultChannelSize)
}

// Exhaust drains all remaining errors in the channel, into a slice of errors.
func Exhaust(ec <-chan error) []error {
out := make([]error, 0, len(ec))
for {
select {
case err := <-ec:
out = append(out, err)
default:
// errCh will NEVER be closed(ya see, it has multi sender-part),
// so we just consume the current backlog of this channel, then return.
return out
}
}
}

type PipelineContext struct {
// pipeline item switch
Checksum bool
LoadStats bool
WaitTiflashReady bool

// pipeline item configuration
LogProgress bool
ChecksumConcurrency uint
StatsConcurrency uint

// pipeline item tool client
KvClient kv.Client
ExtStorage storage.ExternalStorage
Glue glue.Glue
}

// RestorePipeline do some work in pipeline, such as checkum, load stats and wait tiflash ready.
func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) {
// We make bigger errCh so we won't block on multi-part failed.
errCh := make(chan error, 32)
postHandleCh := afterTableRestoredCh(ctx, createdTables)
progressLen := int64(0)
if plCtx.Checksum {
progressLen += int64(len(createdTables))
}
progressLen += int64(len(createdTables)) // for pipeline item - update stats meta
if plCtx.WaitTiflashReady {
progressLen += int64(len(createdTables))
}

// Redirect to log if there is no log file to avoid unreadable output.
updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress)
defer updateCh.Close()
// pipeline checksum
if plCtx.Checksum {
postHandleCh = rc.GoValidateChecksum(
ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency)
}

// pipeline update meta and load stats
postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats)

// pipeline wait Tiflash synced
if plCtx.WaitTiflashReady {
postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh)
}

finish := dropToBlackhole(ctx, postHandleCh, errCh)

select {
case err = <-errCh:
err = multierr.Append(err, multierr.Combine(Exhaust(errCh)...))
case <-finish:
}

return errors.Trace(err)
}

func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable {
outCh := make(chan *CreatedTable)

go func() {
defer close(outCh)

for _, createdTable := range createdTables {
select {
case <-ctx.Done():
return
default:
outCh <- createdTable
}
}
}()
return outCh
}

// dropToBlackhole drop all incoming tables into black hole,
// i.e. don't execute checksum, just increase the process anyhow.
func dropToBlackhole(
ctx context.Context,
inCh <-chan *CreatedTable,
errCh chan<- error,
) <-chan struct{} {
outCh := make(chan struct{}, 1)
go func() {
defer func() {
close(outCh)
}()
for {
select {
case <-ctx.Done():
errCh <- ctx.Err()
return
case _, ok := <-inCh:
if !ok {
return
}
}
}
}()
return outCh
}

func concurrentHandleTablesCh(
ctx context.Context,
inCh <-chan *CreatedTable,
Expand Down Expand Up @@ -183,6 +303,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
s storage.ExternalStorage,
inCh <-chan *CreatedTable,
errCh chan<- error,
updateCh glue.Progress,
statsConcurrency uint,
loadStats bool,
) chan *CreatedTable {
Expand Down Expand Up @@ -233,6 +354,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats(
log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr))
}
}
updateCh.Inc()
return nil
}, func() {
log.Info("all stats updated")
Expand Down
101 changes: 69 additions & 32 deletions br/pkg/restore/snap_client/tikv_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func mapTableToFiles(files []*backuppb.File) (map[int64][]*backuppb.File, int) {
}

// filterOutFiles filters out files that exist in the checkpoint set.
func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File, updateCh glue.Progress) []*backuppb.File {
progress := int(0)
func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File) ([]*backuppb.File, int64) {
progress := int64(0)
totalKVs := uint64(0)
totalBytes := uint64(0)
newFiles := make([]*backuppb.File, 0, len(files))
Expand All @@ -110,14 +110,12 @@ func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File, u
}
}
if progress > 0 {
// (split/scatter + download/ingest) / (default cf + write cf)
updateCh.IncBy(int64(progress) * 2 / 2)
summary.CollectSuccessUnit(summary.TotalKV, progress, totalKVs)
summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, progress, totalKVs)
summary.CollectSuccessUnit(summary.TotalBytes, progress, totalBytes)
summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, progress, totalBytes)
summary.CollectSuccessUnit(summary.TotalKV, 1, totalKVs)
summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, 1, totalKVs)
summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes)
summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, 1, totalBytes)
}
return newFiles
return newFiles, progress
}

// If there are many tables with only a few rows, the number of merged SSTs will be too large.
Expand All @@ -131,8 +129,7 @@ func SortAndValidateFileRanges(
checkpointSetWithTableID map[int64]map[string]struct{},
splitSizeBytes, splitKeyCount uint64,
splitOnTable bool,
updateCh glue.Progress,
) ([][]byte, [][]TableIDWithFiles, error) {
) ([][]byte, [][]TableIDWithFiles, int64, int64, error) {
sortedPhysicalTables := getSortedPhysicalTables(createdTables)
// mapping table ID to its backup files
fileOfTable, hintSplitKeyCount := mapTableToFiles(allFiles)
Expand All @@ -149,24 +146,29 @@ func SortAndValidateFileRanges(
lastFilesGroup []TableIDWithFiles = nil

// statistic
mergedRangeCount = 0
mergedRangeCount = 0
totalWriteCFFile int = 0
totalDefaultCFFile int = 0
skipFileCount int64 = 0
)

log.Info("start to merge ranges", zap.Uint64("kv size threshold", splitSizeBytes), zap.Uint64("kv count threshold", splitKeyCount))
for _, table := range sortedPhysicalTables {
files := fileOfTable[table.OldPhysicalID]
for _, file := range files {
if err := restoreutils.ValidateFileRewriteRule(file, table.RewriteRules); err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, 0, 0, errors.Trace(err)
}
}
// Merge small ranges to reduce split and scatter regions.
// Notice that the files having the same start key and end key are in the same range.
sortedRanges, stat, err := restoreutils.MergeAndRewriteFileRanges(
files, table.RewriteRules, splitSizeBytes, splitKeyCount)
if err != nil {
return nil, nil, errors.Trace(err)
return nil, nil, 0, 0, errors.Trace(err)
}
totalDefaultCFFile += stat.TotalDefaultCFFile
totalWriteCFFile += stat.TotalWriteCFFile
log.Info("merge and validate file",
zap.Int64("new physical ID", table.NewPhysicalID),
zap.Int64("old physical ID", table.OldPhysicalID),
Expand Down Expand Up @@ -233,7 +235,8 @@ func SortAndValidateFileRanges(
// checkpoint filter out the import done files in the previous restore executions.
// Notice that skip ranges after select split keys in order to make the split keys
// always the same.
newFiles := filterOutFiles(checkpointSet, rg.Files, updateCh)
newFiles, progress := filterOutFiles(checkpointSet, rg.Files)
skipFileCount += progress
// append the new files into the group
if len(newFiles) > 0 {
if len(lastFilesGroup) == 0 || lastFilesGroup[len(lastFilesGroup)-1].TableID != table.NewPhysicalID {
Expand Down Expand Up @@ -276,20 +279,38 @@ func SortAndValidateFileRanges(
zap.Int("merged range count", mergedRangeCount))
tableIDWithFilesGroup = append(tableIDWithFilesGroup, lastFilesGroup)
}
return sortedSplitKeys, tableIDWithFilesGroup, nil
summary.CollectInt("default CF files", totalDefaultCFFile)
summary.CollectInt("write CF files", totalWriteCFFile)
log.Info("range and file prepared", zap.Int("default file count", totalDefaultCFFile), zap.Int("write file count", totalWriteCFFile))
return sortedSplitKeys, tableIDWithFilesGroup, skipFileCount, int64(totalDefaultCFFile + totalWriteCFFile), nil
}

type RestoreTablesContext struct {
// configuration
LogProgress bool
SplitSizeBytes uint64
SplitKeyCount uint64
SplitOnTable bool
Online bool

// data
CreatedTables []*CreatedTable
AllFiles []*backuppb.File
CheckpointSetWithTableID map[int64]map[string]struct{}

// tool client
Glue glue.Glue
}

func (rc *SnapClient) RestoreTables(
ctx context.Context,
placementRuleManager PlacementRuleManager,
createdTables []*CreatedTable,
allFiles []*backuppb.File,
checkpointSetWithTableID map[int64]map[string]struct{},
splitSizeBytes, splitKeyCount uint64,
splitOnTable bool,
updateCh glue.Progress,
rtCtx RestoreTablesContext,
) error {
if err := placementRuleManager.SetPlacementRule(ctx, createdTables); err != nil {
placementRuleManager, err := NewPlacementRuleManager(ctx, rc.pdClient, rc.pdHTTPClient, rc.tlsConf, rtCtx.Online)
if err != nil {
return errors.Trace(err)
}
if err := placementRuleManager.SetPlacementRule(ctx, rtCtx.CreatedTables); err != nil {
return errors.Trace(err)
}
defer func() {
Expand All @@ -300,26 +321,39 @@ func (rc *SnapClient) RestoreTables(
}()

start := time.Now()
sortedSplitKeys, tableIDWithFilesGroup, err := SortAndValidateFileRanges(createdTables, allFiles, checkpointSetWithTableID, splitSizeBytes, splitKeyCount, splitOnTable, updateCh)
sortedSplitKeys, tableIDWithFilesGroup, progressSkip, progressLen, err :=
SortAndValidateFileRanges(rtCtx.CreatedTables, rtCtx.AllFiles, rtCtx.CheckpointSetWithTableID, rtCtx.SplitSizeBytes, rtCtx.SplitKeyCount, rtCtx.SplitOnTable)
if err != nil {
return errors.Trace(err)
}
log.Info("Restore Stage Duration", zap.String("stage", "merge ranges"), zap.Duration("take", time.Since(start)))
elapsed := time.Since(start)
summary.CollectDuration("merge ranges", elapsed)
log.Info("Restore Stage Duration", zap.String("stage", "merge ranges"), zap.Duration("take", elapsed))

start = time.Now()
if err = rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false); err != nil {
if err := glue.WithProgress(ctx, rtCtx.Glue, "Split&Scatter Region", int64(len(sortedSplitKeys)), !rtCtx.LogProgress, func(updateCh glue.Progress) error {
return rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false)
}); err != nil {
return errors.Trace(err)
}
log.Info("Restore Stage Duration", zap.String("stage", "split regions"), zap.Duration("take", time.Since(start)))
elapsed = time.Since(start)
summary.CollectDuration("split region", elapsed)
summary.CollectSuccessUnit("split keys", 1, len(sortedSplitKeys))
log.Info("Restore Stage Duration", zap.String("stage", "split regions"), zap.Duration("take", elapsed))

start = time.Now()
if err = rc.RestoreSSTFiles(ctx, tableIDWithFilesGroup, updateCh); err != nil {
if err := glue.WithProgress(ctx, rtCtx.Glue, "Download&Ingest SST", progressLen, !rtCtx.LogProgress, func(updateCh glue.Progress) error {
updateCh.IncBy(progressSkip)
return rc.RestoreSSTFiles(ctx, tableIDWithFilesGroup, updateCh)
}); err != nil {
return errors.Trace(err)
}
elapsed := time.Since(start)

elapsed = time.Since(start)
summary.CollectDuration("restore files", elapsed)
log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed))

summary.CollectSuccessUnit("files", len(allFiles), elapsed)
summary.CollectSuccessUnit("files", len(rtCtx.AllFiles), elapsed)
return nil
}

Expand Down Expand Up @@ -373,6 +407,7 @@ func (rc *SnapClient) RestoreSSTFiles(
if err := rc.setSpeedLimit(ctx, rc.rateLimit); err != nil {
return errors.Trace(err)
}
defer rc.resetSpeedLimit(ctx)

failpoint.Inject("corrupt-files", func(v failpoint.Value) {
if cmd, ok := v.(string); ok {
Expand Down Expand Up @@ -418,7 +453,9 @@ func (rc *SnapClient) restoreSSTFilesInternal(
if restoreErr == nil {
log.Info("import files done", zapFilesGroup(filesReplica),
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
for _, filesGroup := range filesReplica {
updateCh.IncBy(int64(len(filesGroup.Files)))
}
}
}()
if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil {
Expand Down
Loading

0 comments on commit 48329ac

Please sign in to comment.