From 48329ac73ce0fad6b1fb80b1c5e0e132637f1e40 Mon Sep 17 00:00:00 2001 From: Jianjun Liao Date: Sat, 12 Oct 2024 15:04:52 +0800 Subject: [PATCH] improve visualization of br Signed-off-by: Jianjun Liao --- br/pkg/glue/glue.go | 7 + br/pkg/restore/snap_client/client.go | 25 ++- br/pkg/restore/snap_client/pipeline_items.go | 122 +++++++++++++ br/pkg/restore/snap_client/tikv_sender.go | 101 +++++++---- .../restore/snap_client/tikv_sender_test.go | 40 ++++- br/pkg/task/restore.go | 162 +++--------------- br/pkg/task/stream.go | 12 +- 7 files changed, 283 insertions(+), 186 deletions(-) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index d691d638169ab..61ba58b94faed 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -82,3 +82,10 @@ type Progress interface { // called. Close() } + +// WithProgress execute some logic with the progress, and close it once the execution done. +func WithProgress(ctx context.Context, g Glue, cmdName string, total int64, redirectLog bool, cc func(p Progress) error) error { + p := g.StartProgress(ctx, cmdName, total, redirectLog) + defer p.Close() + return cc(p) +} diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index d0f68157ff410..31efb1d0fba76 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -66,8 +66,9 @@ const ( strictPlacementPolicyMode = "STRICT" ignorePlacementPolicyMode = "IGNORE" - defaultDDLConcurrency = 16 - maxSplitKeysOnce = 10240 + defaultDDLConcurrency = 16 + maxSplitKeysOnce = 10240 + resetSpeedLimitRetryTimes = 3 ) const minBatchDdlSize = 1 @@ -948,7 +949,25 @@ func (rc *SnapClient) ExecDDLs(ctx context.Context, ddlJobs []*model.Job) error return nil } -func (rc *SnapClient) ResetSpeedLimit(ctx context.Context) error { +func (rc *SnapClient) resetSpeedLimit(ctx context.Context) { + var resetErr error + // In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO + for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ { + resetErr = rc.resetSpeedLimitInternal(ctx) + if resetErr != nil { + log.Warn("failed to reset speed limit, retry it", + zap.Int("retry time", retry), logutil.ShortError(resetErr)) + time.Sleep(time.Duration(retry+3) * time.Second) + continue + } + break + } + if resetErr != nil { + log.Error("failed to reset speed limit, please reset it manually", zap.Error(resetErr)) + } +} + +func (rc *SnapClient) resetSpeedLimitInternal(ctx context.Context) error { rc.hasSpeedLimited = false err := rc.setSpeedLimit(ctx, 0) if err != nil { diff --git a/br/pkg/restore/snap_client/pipeline_items.go b/br/pkg/restore/snap_client/pipeline_items.go index f76417a636c4a..d24277d48d5c7 100644 --- a/br/pkg/restore/snap_client/pipeline_items.go +++ b/br/pkg/restore/snap_client/pipeline_items.go @@ -33,6 +33,7 @@ import ( tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/engine" pdhttp "github.com/tikv/pd/client/http" + "go.uber.org/multierr" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" @@ -106,6 +107,125 @@ func defaultOutputTableChan() chan *CreatedTable { return make(chan *CreatedTable, defaultChannelSize) } +// Exhaust drains all remaining errors in the channel, into a slice of errors. +func Exhaust(ec <-chan error) []error { + out := make([]error, 0, len(ec)) + for { + select { + case err := <-ec: + out = append(out, err) + default: + // errCh will NEVER be closed(ya see, it has multi sender-part), + // so we just consume the current backlog of this channel, then return. + return out + } + } +} + +type PipelineContext struct { + // pipeline item switch + Checksum bool + LoadStats bool + WaitTiflashReady bool + + // pipeline item configuration + LogProgress bool + ChecksumConcurrency uint + StatsConcurrency uint + + // pipeline item tool client + KvClient kv.Client + ExtStorage storage.ExternalStorage + Glue glue.Glue +} + +// RestorePipeline do some work in pipeline, such as checkum, load stats and wait tiflash ready. +func (rc *SnapClient) RestorePipeline(ctx context.Context, plCtx PipelineContext, createdTables []*CreatedTable) (err error) { + // We make bigger errCh so we won't block on multi-part failed. + errCh := make(chan error, 32) + postHandleCh := afterTableRestoredCh(ctx, createdTables) + progressLen := int64(0) + if plCtx.Checksum { + progressLen += int64(len(createdTables)) + } + progressLen += int64(len(createdTables)) // for pipeline item - update stats meta + if plCtx.WaitTiflashReady { + progressLen += int64(len(createdTables)) + } + + // Redirect to log if there is no log file to avoid unreadable output. + updateCh := plCtx.Glue.StartProgress(ctx, "Restore Pipeline", progressLen, !plCtx.LogProgress) + defer updateCh.Close() + // pipeline checksum + if plCtx.Checksum { + postHandleCh = rc.GoValidateChecksum( + ctx, postHandleCh, plCtx.KvClient, errCh, updateCh, plCtx.ChecksumConcurrency) + } + + // pipeline update meta and load stats + postHandleCh = rc.GoUpdateMetaAndLoadStats(ctx, plCtx.ExtStorage, postHandleCh, errCh, updateCh, plCtx.StatsConcurrency, plCtx.LoadStats) + + // pipeline wait Tiflash synced + if plCtx.WaitTiflashReady { + postHandleCh = rc.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh) + } + + finish := dropToBlackhole(ctx, postHandleCh, errCh) + + select { + case err = <-errCh: + err = multierr.Append(err, multierr.Combine(Exhaust(errCh)...)) + case <-finish: + } + + return errors.Trace(err) +} + +func afterTableRestoredCh(ctx context.Context, createdTables []*CreatedTable) <-chan *CreatedTable { + outCh := make(chan *CreatedTable) + + go func() { + defer close(outCh) + + for _, createdTable := range createdTables { + select { + case <-ctx.Done(): + return + default: + outCh <- createdTable + } + } + }() + return outCh +} + +// dropToBlackhole drop all incoming tables into black hole, +// i.e. don't execute checksum, just increase the process anyhow. +func dropToBlackhole( + ctx context.Context, + inCh <-chan *CreatedTable, + errCh chan<- error, +) <-chan struct{} { + outCh := make(chan struct{}, 1) + go func() { + defer func() { + close(outCh) + }() + for { + select { + case <-ctx.Done(): + errCh <- ctx.Err() + return + case _, ok := <-inCh: + if !ok { + return + } + } + } + }() + return outCh +} + func concurrentHandleTablesCh( ctx context.Context, inCh <-chan *CreatedTable, @@ -183,6 +303,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats( s storage.ExternalStorage, inCh <-chan *CreatedTable, errCh chan<- error, + updateCh glue.Progress, statsConcurrency uint, loadStats bool, ) chan *CreatedTable { @@ -233,6 +354,7 @@ func (rc *SnapClient) GoUpdateMetaAndLoadStats( log.Error("update stats meta failed", zap.Any("table", tbl.Table), zap.Error(statsErr)) } } + updateCh.Inc() return nil }, func() { log.Info("all stats updated") diff --git a/br/pkg/restore/snap_client/tikv_sender.go b/br/pkg/restore/snap_client/tikv_sender.go index a5f27a87c4050..4c4050ba902e0 100644 --- a/br/pkg/restore/snap_client/tikv_sender.go +++ b/br/pkg/restore/snap_client/tikv_sender.go @@ -92,8 +92,8 @@ func mapTableToFiles(files []*backuppb.File) (map[int64][]*backuppb.File, int) { } // filterOutFiles filters out files that exist in the checkpoint set. -func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File, updateCh glue.Progress) []*backuppb.File { - progress := int(0) +func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File) ([]*backuppb.File, int64) { + progress := int64(0) totalKVs := uint64(0) totalBytes := uint64(0) newFiles := make([]*backuppb.File, 0, len(files)) @@ -110,14 +110,12 @@ func filterOutFiles(checkpointSet map[string]struct{}, files []*backuppb.File, u } } if progress > 0 { - // (split/scatter + download/ingest) / (default cf + write cf) - updateCh.IncBy(int64(progress) * 2 / 2) - summary.CollectSuccessUnit(summary.TotalKV, progress, totalKVs) - summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, progress, totalKVs) - summary.CollectSuccessUnit(summary.TotalBytes, progress, totalBytes) - summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, progress, totalBytes) + summary.CollectSuccessUnit(summary.TotalKV, 1, totalKVs) + summary.CollectSuccessUnit(summary.SkippedKVCountByCheckpoint, 1, totalKVs) + summary.CollectSuccessUnit(summary.TotalBytes, 1, totalBytes) + summary.CollectSuccessUnit(summary.SkippedBytesByCheckpoint, 1, totalBytes) } - return newFiles + return newFiles, progress } // If there are many tables with only a few rows, the number of merged SSTs will be too large. @@ -131,8 +129,7 @@ func SortAndValidateFileRanges( checkpointSetWithTableID map[int64]map[string]struct{}, splitSizeBytes, splitKeyCount uint64, splitOnTable bool, - updateCh glue.Progress, -) ([][]byte, [][]TableIDWithFiles, error) { +) ([][]byte, [][]TableIDWithFiles, int64, int64, error) { sortedPhysicalTables := getSortedPhysicalTables(createdTables) // mapping table ID to its backup files fileOfTable, hintSplitKeyCount := mapTableToFiles(allFiles) @@ -149,7 +146,10 @@ func SortAndValidateFileRanges( lastFilesGroup []TableIDWithFiles = nil // statistic - mergedRangeCount = 0 + mergedRangeCount = 0 + totalWriteCFFile int = 0 + totalDefaultCFFile int = 0 + skipFileCount int64 = 0 ) log.Info("start to merge ranges", zap.Uint64("kv size threshold", splitSizeBytes), zap.Uint64("kv count threshold", splitKeyCount)) @@ -157,7 +157,7 @@ func SortAndValidateFileRanges( files := fileOfTable[table.OldPhysicalID] for _, file := range files { if err := restoreutils.ValidateFileRewriteRule(file, table.RewriteRules); err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, 0, 0, errors.Trace(err) } } // Merge small ranges to reduce split and scatter regions. @@ -165,8 +165,10 @@ func SortAndValidateFileRanges( sortedRanges, stat, err := restoreutils.MergeAndRewriteFileRanges( files, table.RewriteRules, splitSizeBytes, splitKeyCount) if err != nil { - return nil, nil, errors.Trace(err) + return nil, nil, 0, 0, errors.Trace(err) } + totalDefaultCFFile += stat.TotalDefaultCFFile + totalWriteCFFile += stat.TotalWriteCFFile log.Info("merge and validate file", zap.Int64("new physical ID", table.NewPhysicalID), zap.Int64("old physical ID", table.OldPhysicalID), @@ -233,7 +235,8 @@ func SortAndValidateFileRanges( // checkpoint filter out the import done files in the previous restore executions. // Notice that skip ranges after select split keys in order to make the split keys // always the same. - newFiles := filterOutFiles(checkpointSet, rg.Files, updateCh) + newFiles, progress := filterOutFiles(checkpointSet, rg.Files) + skipFileCount += progress // append the new files into the group if len(newFiles) > 0 { if len(lastFilesGroup) == 0 || lastFilesGroup[len(lastFilesGroup)-1].TableID != table.NewPhysicalID { @@ -276,20 +279,38 @@ func SortAndValidateFileRanges( zap.Int("merged range count", mergedRangeCount)) tableIDWithFilesGroup = append(tableIDWithFilesGroup, lastFilesGroup) } - return sortedSplitKeys, tableIDWithFilesGroup, nil + summary.CollectInt("default CF files", totalDefaultCFFile) + summary.CollectInt("write CF files", totalWriteCFFile) + log.Info("range and file prepared", zap.Int("default file count", totalDefaultCFFile), zap.Int("write file count", totalWriteCFFile)) + return sortedSplitKeys, tableIDWithFilesGroup, skipFileCount, int64(totalDefaultCFFile + totalWriteCFFile), nil +} + +type RestoreTablesContext struct { + // configuration + LogProgress bool + SplitSizeBytes uint64 + SplitKeyCount uint64 + SplitOnTable bool + Online bool + + // data + CreatedTables []*CreatedTable + AllFiles []*backuppb.File + CheckpointSetWithTableID map[int64]map[string]struct{} + + // tool client + Glue glue.Glue } func (rc *SnapClient) RestoreTables( ctx context.Context, - placementRuleManager PlacementRuleManager, - createdTables []*CreatedTable, - allFiles []*backuppb.File, - checkpointSetWithTableID map[int64]map[string]struct{}, - splitSizeBytes, splitKeyCount uint64, - splitOnTable bool, - updateCh glue.Progress, + rtCtx RestoreTablesContext, ) error { - if err := placementRuleManager.SetPlacementRule(ctx, createdTables); err != nil { + placementRuleManager, err := NewPlacementRuleManager(ctx, rc.pdClient, rc.pdHTTPClient, rc.tlsConf, rtCtx.Online) + if err != nil { + return errors.Trace(err) + } + if err := placementRuleManager.SetPlacementRule(ctx, rtCtx.CreatedTables); err != nil { return errors.Trace(err) } defer func() { @@ -300,26 +321,39 @@ func (rc *SnapClient) RestoreTables( }() start := time.Now() - sortedSplitKeys, tableIDWithFilesGroup, err := SortAndValidateFileRanges(createdTables, allFiles, checkpointSetWithTableID, splitSizeBytes, splitKeyCount, splitOnTable, updateCh) + sortedSplitKeys, tableIDWithFilesGroup, progressSkip, progressLen, err := + SortAndValidateFileRanges(rtCtx.CreatedTables, rtCtx.AllFiles, rtCtx.CheckpointSetWithTableID, rtCtx.SplitSizeBytes, rtCtx.SplitKeyCount, rtCtx.SplitOnTable) if err != nil { return errors.Trace(err) } - log.Info("Restore Stage Duration", zap.String("stage", "merge ranges"), zap.Duration("take", time.Since(start))) + elapsed := time.Since(start) + summary.CollectDuration("merge ranges", elapsed) + log.Info("Restore Stage Duration", zap.String("stage", "merge ranges"), zap.Duration("take", elapsed)) start = time.Now() - if err = rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false); err != nil { + if err := glue.WithProgress(ctx, rtCtx.Glue, "Split&Scatter Region", int64(len(sortedSplitKeys)), !rtCtx.LogProgress, func(updateCh glue.Progress) error { + return rc.SplitPoints(ctx, sortedSplitKeys, updateCh, false) + }); err != nil { return errors.Trace(err) } - log.Info("Restore Stage Duration", zap.String("stage", "split regions"), zap.Duration("take", time.Since(start))) + elapsed = time.Since(start) + summary.CollectDuration("split region", elapsed) + summary.CollectSuccessUnit("split keys", 1, len(sortedSplitKeys)) + log.Info("Restore Stage Duration", zap.String("stage", "split regions"), zap.Duration("take", elapsed)) start = time.Now() - if err = rc.RestoreSSTFiles(ctx, tableIDWithFilesGroup, updateCh); err != nil { + if err := glue.WithProgress(ctx, rtCtx.Glue, "Download&Ingest SST", progressLen, !rtCtx.LogProgress, func(updateCh glue.Progress) error { + updateCh.IncBy(progressSkip) + return rc.RestoreSSTFiles(ctx, tableIDWithFilesGroup, updateCh) + }); err != nil { return errors.Trace(err) } - elapsed := time.Since(start) + + elapsed = time.Since(start) + summary.CollectDuration("restore files", elapsed) log.Info("Restore Stage Duration", zap.String("stage", "restore files"), zap.Duration("take", elapsed)) - summary.CollectSuccessUnit("files", len(allFiles), elapsed) + summary.CollectSuccessUnit("files", len(rtCtx.AllFiles), elapsed) return nil } @@ -373,6 +407,7 @@ func (rc *SnapClient) RestoreSSTFiles( if err := rc.setSpeedLimit(ctx, rc.rateLimit); err != nil { return errors.Trace(err) } + defer rc.resetSpeedLimit(ctx) failpoint.Inject("corrupt-files", func(v failpoint.Value) { if cmd, ok := v.(string); ok { @@ -418,7 +453,9 @@ func (rc *SnapClient) restoreSSTFilesInternal( if restoreErr == nil { log.Info("import files done", zapFilesGroup(filesReplica), zap.Duration("take", time.Since(fileStart))) - updateCh.Inc() + for _, filesGroup := range filesReplica { + updateCh.IncBy(int64(len(filesGroup.Files))) + } } }() if importErr := rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rc.cipher, rc.dom.Store().GetCodec().GetAPIVersion()); importErr != nil { diff --git a/br/pkg/restore/snap_client/tikv_sender_test.go b/br/pkg/restore/snap_client/tikv_sender_test.go index b23ec6298d40f..1b72e56211fe8 100644 --- a/br/pkg/restore/snap_client/tikv_sender_test.go +++ b/br/pkg/restore/snap_client/tikv_sender_test.go @@ -225,8 +225,6 @@ func cptKey(tableID int64, startRow int, cf string) string { } func TestSortAndValidateFileRanges(t *testing.T) { - updateCh := MockUpdateCh{} - d := restoreutils.DefaultCFName w := restoreutils.WriteCFName cases := []struct { @@ -248,6 +246,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { // expected result splitKeys [][]byte tableIDWithFilesGroups [][]snapclient.TableIDWithFiles + skipFileCount int64 + totalFileCount int64 }{ { // large sst, split-on-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -277,6 +277,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // large sst, split-on-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -305,6 +307,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, //{files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // large sst, no split-on-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -330,6 +334,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // large sst, no split-on-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -358,6 +364,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, //{files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 1, split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -383,6 +391,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 1, split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -411,6 +421,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, // {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 1, no split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -434,6 +446,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 1, no split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -459,6 +473,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(102, []int{1}, []string{w})}, {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 2, split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -481,6 +497,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 2, split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -505,6 +523,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(202, []int{2, 2}, []string{w, d})}, {files(302, []int{1}, []string{w})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 2, no split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -528,6 +548,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(202, []int{1, 1, 2, 2}, []string{w, d, w, d})}, {files(302, []int{1}, []string{w}), files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 2, no split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -554,6 +576,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(202, []int{2, 2}, []string{w, d})}, {files(302, []int{1}, []string{w})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 3, no split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -576,6 +600,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(102, []int{1}, []string{w}), files(202, []int{1, 1, 2, 2}, []string{w, d, w, d})}, {files(302, []int{1}, []string{w}), files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 3, no split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -601,6 +627,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(102, []int{1}, []string{w}), files(202, []int{2, 2}, []string{w, d, w, d})}, {files(302, []int{1}, []string{w})}, }, + skipFileCount: 4, + totalFileCount: 8, }, { // small sst 4, no split-table, no checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -624,6 +652,8 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, {files(100, []int{1, 1}, []string{w, d})}, }, + skipFileCount: 0, + totalFileCount: 8, }, { // small sst 4, no split-table, checkpoint upstreamTableIDs: []int64{100, 200, 300}, @@ -649,16 +679,20 @@ func TestSortAndValidateFileRanges(t *testing.T) { {files(102, []int{1}, []string{w})}, {files(202, []int{2, 2}, []string{w, d}), files(302, []int{1}, []string{w})}, }, + skipFileCount: 4, + totalFileCount: 8, }, } for i, cs := range cases { t.Log(i) createdTables := generateCreatedTables(t, cs.upstreamTableIDs, cs.upstreamPartitionIDs, downstreamID) - splitKeys, tableIDWithFilesGroups, err := snapclient.SortAndValidateFileRanges(createdTables, cs.files, cs.checkpointSetWithTableID, cs.splitSizeBytes, cs.splitKeyCount, cs.splitOnTable, updateCh) + splitKeys, tableIDWithFilesGroups, skipFileCount, totalFileCount, err := snapclient.SortAndValidateFileRanges(createdTables, cs.files, cs.checkpointSetWithTableID, cs.splitSizeBytes, cs.splitKeyCount, cs.splitOnTable) require.NoError(t, err) require.Equal(t, cs.splitKeys, splitKeys) require.Equal(t, len(cs.tableIDWithFilesGroups), len(tableIDWithFilesGroups)) + require.Equal(t, cs.skipFileCount, skipFileCount) + require.Equal(t, cs.totalFileCount, totalFileCount) for i, expectFilesGroup := range cs.tableIDWithFilesGroups { actualFilesGroup := tableIDWithFilesGroups[i] require.Equal(t, len(expectFilesGroup), len(actualFilesGroup)) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 5bee261ceff21..5063fce4d9463 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -100,7 +100,6 @@ const ( defaultStatsConcurrency = 12 defaultBatchFlushInterval = 16 * time.Second defaultFlagDdlBatchSize = 128 - resetSpeedLimitRetryTimes = 3 ) const ( @@ -1064,10 +1063,6 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } } - rangeSize := EstimateRangeSize(files) - summary.CollectInt("restore ranges", rangeSize) - log.Info("range and file prepared", zap.Int("file count", len(files)), zap.Int("range count", rangeSize)) - // Do not reset timestamp if we are doing incremental restore, because // we are not allowed to decrease timestamp. if !client.IsIncremental() { @@ -1077,80 +1072,42 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s } } - // Split/Scatter + Download/Ingest - progressLen := int64(rangeSize + len(files)) - if cfg.Checksum { - progressLen += int64(len(tables)) - } - if cfg.WaitTiflashReady { - progressLen += int64(len(tables)) - } - // Redirect to log if there is no log file to avoid unreadable output. - updateCh := g.StartProgress(ctx, cmdName, progressLen, !cfg.LogProgress) - defer updateCh.Close() - placementRuleManager, err := snapclient.NewPlacementRuleManager(ctx, mgr.GetPDClient(), mgr.GetPDHTTPClient(), mgr.GetTLSConfig(), cfg.Online) - if err != nil { - return errors.Trace(err) - } - if err := client.RestoreTables(ctx, placementRuleManager, createdTables, files, checkpointSetWithTableID, - kvConfigs.MergeRegionSize.Value, kvConfigs.MergeRegionKeyCount.Value, + rtCtx := snapclient.RestoreTablesContext{ + LogProgress: cfg.LogProgress, + SplitSizeBytes: kvConfigs.MergeRegionSize.Value, + SplitKeyCount: kvConfigs.MergeRegionKeyCount.Value, // If the command is from BR binary, the ddl.EnableSplitTableRegion is always 0, // If the command is from BRIE SQL, the ddl.EnableSplitTableRegion is TiDB config split-table. // Notice that `split-region-on-table` configure from TiKV split on the region having data, it may trigger after restore done. // It's recommended to enable TiDB configure `split-table` instead. - atomic.LoadUint32(&ddl.EnableSplitTableRegion) == 1, - updateCh, - ); err != nil { - return errors.Trace(err) - } + SplitOnTable: atomic.LoadUint32(&ddl.EnableSplitTableRegion) == 1, + Online: cfg.Online, - // We make bigger errCh so we won't block on multi-part failed. - errCh := make(chan error, 32) - postHandleCh := afterTableRestoredCh(ctx, createdTables) + CreatedTables: createdTables, + AllFiles: files, + CheckpointSetWithTableID: checkpointSetWithTableID, - // pipeline checksum - if cfg.Checksum { - postHandleCh = client.GoValidateChecksum( - ctx, postHandleCh, mgr.GetStorage().GetClient(), errCh, updateCh, cfg.ChecksumConcurrency) + Glue: g, } - - // pipeline update meta and load stats - postHandleCh = client.GoUpdateMetaAndLoadStats(ctx, s, postHandleCh, errCh, cfg.StatsConcurrency, cfg.LoadStats) - - // pipeline wait Tiflash synced - if cfg.WaitTiflashReady { - postHandleCh = client.GoWaitTiFlashReady(ctx, postHandleCh, updateCh, errCh) + if err := client.RestoreTables(ctx, rtCtx); err != nil { + return errors.Trace(err) } - finish := dropToBlackhole(ctx, postHandleCh, errCh) + plCtx := snapclient.PipelineContext{ + Checksum: cfg.Checksum, + LoadStats: cfg.LoadStats, + WaitTiflashReady: cfg.WaitTiflashReady, - // Reset speed limit. ResetSpeedLimit must be called after client.InitBackupMeta has been called. - defer func() { - var resetErr error - // In future we may need a mechanism to set speed limit in ttl. like what we do in switchmode. TODO - for retry := 0; retry < resetSpeedLimitRetryTimes; retry++ { - resetErr = client.ResetSpeedLimit(ctx) - if resetErr != nil { - log.Warn("failed to reset speed limit, retry it", - zap.Int("retry time", retry), logutil.ShortError(resetErr)) - time.Sleep(time.Duration(retry+3) * time.Second) - continue - } - break - } - if resetErr != nil { - log.Error("failed to reset speed limit, please reset it manually", zap.Error(resetErr)) - } - }() + LogProgress: cfg.LogProgress, + ChecksumConcurrency: cfg.ChecksumConcurrency, + StatsConcurrency: cfg.StatsConcurrency, - select { - case err = <-errCh: - err = multierr.Append(err, multierr.Combine(Exhaust(errCh)...)) - case <-finish: + KvClient: mgr.GetStorage().GetClient(), + ExtStorage: s, + Glue: g, } - - // If any error happened, return now. - if err != nil { + // Do some work in pipeline, such as checkum, load stats and wait tiflash ready. + if err := client.RestorePipeline(ctx, plCtx, createdTables); err != nil { return errors.Trace(err) } @@ -1305,21 +1262,6 @@ func checkDiskSpace(ctx context.Context, mgr *conn.Mgr, files []*backuppb.File, return nil } -// Exhaust drains all remaining errors in the channel, into a slice of errors. -func Exhaust(ec <-chan error) []error { - out := make([]error, 0, len(ec)) - for { - select { - case err := <-ec: - out = append(out, err) - default: - // errCh will NEVER be closed(ya see, it has multi sender-part), - // so we just consume the current backlog of this channel, then return. - return out - } - } -} - func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil.Table, g glue.Glue) error { // Tasks from br clp client use other checks to validate if g.GetClient() != glue.ClientSql { @@ -1342,44 +1284,6 @@ func checkTableExistence(ctx context.Context, mgr *conn.Mgr, tables []*metautil. return nil } -// EstimateRangeSize estimates the total range count by file. -func EstimateRangeSize(files []*backuppb.File) int { - result := 0 - for _, f := range files { - if strings.HasSuffix(f.GetName(), "_write.sst") { - result++ - } - } - return result -} - -// dropToBlackhole drop all incoming tables into black hole, -// i.e. don't execute checksum, just increase the process anyhow. -func dropToBlackhole( - ctx context.Context, - inCh <-chan *snapclient.CreatedTable, - errCh chan<- error, -) <-chan struct{} { - outCh := make(chan struct{}, 1) - go func() { - defer func() { - close(outCh) - }() - for { - select { - case <-ctx.Done(): - errCh <- ctx.Err() - return - case _, ok := <-inCh: - if !ok { - return - } - } - } - }() - return outCh -} - // filterRestoreFiles filters tables that can't be processed after applying cfg.TableFilter.MatchTable. // if the db has no table that can be processed, the db will be filtered too. func filterRestoreFiles( @@ -1641,21 +1545,3 @@ func checkIsInActions(action model.ActionType, actions map[model.ActionType]stru _, ok := actions[action] return ok } - -func afterTableRestoredCh(ctx context.Context, createdTables []*snapclient.CreatedTable) <-chan *snapclient.CreatedTable { - outCh := make(chan *snapclient.CreatedTable) - - go func() { - defer close(outCh) - - for _, createdTable := range createdTables { - select { - case <-ctx.Done(): - return - default: - outCh <- createdTable - } - } - }() - return outCh -} diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 6009eb47a6c70..20fbe8ceae5f0 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1414,8 +1414,7 @@ func restoreStream( if err != nil { return err } - pm := g.StartProgress(ctx, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress) - if err = withProgress(pm, func(p glue.Progress) error { + if err = glue.WithProgress(ctx, g, "Restore Meta Files", int64(len(ddlFiles)), !cfg.LogProgress, func(p glue.Progress) error { client.RunGCRowsLoader(ctx) return client.RestoreMetaKVFiles(ctx, ddlFiles, schemasReplace, updateStats, p.Inc) }); err != nil { @@ -1442,8 +1441,7 @@ func restoreStream( if err != nil { return errors.Trace(err) } - pd := g.StartProgress(ctx, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress) - err = withProgress(pd, func(p glue.Progress) (pErr error) { + err = glue.WithProgress(ctx, g, "Restore KV Files", int64(dataFileCount), !cfg.LogProgress, func(p glue.Progress) (pErr error) { if cfg.UseCheckpoint { updateStatsWithCheckpoint := func(kvCount, size uint64) { mu.Lock() @@ -1601,12 +1599,6 @@ func checkLogRange(restoreFromTS, restoreToTS, logMinTS, logMaxTS uint64) error return nil } -// withProgress execute some logic with the progress, and close it once the execution done. -func withProgress(p glue.Progress, cc func(p glue.Progress) error) error { - defer p.Close() - return cc(p) -} - type backupLogInfo struct { logMaxTS uint64 logMinTS uint64