Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into rename-table-args
Browse files Browse the repository at this point in the history
  • Loading branch information
joccau committed Sep 16, 2024
2 parents 4220409 + 26443da commit a1d1233
Show file tree
Hide file tree
Showing 156 changed files with 5,649 additions and 2,371 deletions.
72 changes: 36 additions & 36 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -6010,13 +6010,13 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sha256 = "368b662c8669d91bcd488b780b8ecb273855b8fc54c1043907171bdebc3ffc54",
strip_prefix = "github.com/pingcap/[email protected]20240904041139-1de8accd5bb7",
sha256 = "701b1e4f0b7c5007ef20ac9e8ae7f97add4edea1308c2f41a50ce6639905c9a7",
strip_prefix = "github.com/pingcap/[email protected]20240910154453-b242104f8d31",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240904041139-1de8accd5bb7.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240910154453-b242104f8d31.zip",
"http://ats.apps.svc/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240910154453-b242104f8d31.zip",
"https://cache.hawkingrei.com/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240910154453-b242104f8d31.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/kvproto/com_github_pingcap_kvproto-v0.0.0-20240910154453-b242104f8d31.zip",
],
)
go_repository(
Expand Down Expand Up @@ -7206,26 +7206,26 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sha256 = "2c26a7a94e44e2aae520f2013f8d738c5c5f1fb9f70b76894843f6827ce945f7",
strip_prefix = "github.com/tikv/client-go/[email protected].20240821073530-75e3705e58f1",
sha256 = "8e1b11f3d4105df1114446700c91aab534967165c1c586ffbb51f94037b3c165",
strip_prefix = "github.com/tikv/client-go/[email protected].20240911041506-e7894a7b27ba",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240821073530-75e3705e58f1.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240911041506-e7894a7b27ba.zip",
"http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240911041506-e7894a7b27ba.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240911041506-e7894a7b27ba.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20240911041506-e7894a7b27ba.zip",
],
)
go_repository(
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "bb4aa99260c2d1b22054d539cb37c18ce276c96b1b7c4a8f14ac4cbcda829654",
strip_prefix = "github.com/tikv/pd/[email protected]20240805092608-838ee7983b78",
sha256 = "fc2042d3b3c753de90ac2afdfea97b663f3043aa716f1ee56d2fdf98864e3cbd",
strip_prefix = "github.com/tikv/pd/[email protected]20240914083230-71f6f96816e9",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240805092608-838ee7983b78.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240805092608-838ee7983b78.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240805092608-838ee7983b78.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240805092608-838ee7983b78.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240914083230-71f6f96816e9.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240914083230-71f6f96816e9.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240914083230-71f6f96816e9.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240914083230-71f6f96816e9.zip",
],
)
go_repository(
Expand Down Expand Up @@ -10300,13 +10300,13 @@ def go_deps():
name = "org_golang_x_crypto",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/crypto",
sha256 = "ec96acfe28be3ff2fb14201c5f51132f0e24c7d0d6f3201a8aa69c84f989d014",
strip_prefix = "golang.org/x/crypto@v0.26.0",
sha256 = "c724b619b457bb1c445a39541449b1348eb3851323a29d2c313ad0139634d0a5",
strip_prefix = "golang.org/x/crypto@v0.27.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.26.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.26.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.26.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.26.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.27.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.27.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.27.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/crypto/org_golang_x_crypto-v0.27.0.zip",
],
)
go_repository(
Expand Down Expand Up @@ -10469,26 +10469,26 @@ def go_deps():
name = "org_golang_x_term",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/term",
sha256 = "2597a62b487b952c11c89b2001551af1fe1d29c484388ec1c3f5e3be7ff58ba5",
strip_prefix = "golang.org/x/term@v0.23.0",
sha256 = "80b2b247641ca1b8f54769de20d4d6a0c6861de57a3bc1b31e8432a2a7483ab3",
strip_prefix = "golang.org/x/term@v0.24.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/term/org_golang_x_term-v0.23.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/term/org_golang_x_term-v0.23.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/term/org_golang_x_term-v0.23.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/term/org_golang_x_term-v0.23.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/term/org_golang_x_term-v0.24.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/term/org_golang_x_term-v0.24.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/term/org_golang_x_term-v0.24.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/term/org_golang_x_term-v0.24.0.zip",
],
)
go_repository(
name = "org_golang_x_text",
build_file_proto_mode = "disable_global",
importpath = "golang.org/x/text",
sha256 = "48464f2ab2f988ca8b7b0a9d098e3664224c3b128629b5a9cc08025ee4a7e4ec",
strip_prefix = "golang.org/x/text@v0.17.0",
sha256 = "09da08281c6854e695cdffb25569df0abf53fe545c6610be09d58294728e81e5",
strip_prefix = "golang.org/x/text@v0.18.0",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/text/org_golang_x_text-v0.17.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/text/org_golang_x_text-v0.17.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/text/org_golang_x_text-v0.17.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/text/org_golang_x_text-v0.17.0.zip",
"http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/text/org_golang_x_text-v0.18.0.zip",
"http://ats.apps.svc/gomod/golang.org/x/text/org_golang_x_text-v0.18.0.zip",
"https://cache.hawkingrei.com/gomod/golang.org/x/text/org_golang_x_text-v0.18.0.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/text/org_golang_x_text-v0.18.0.zip",
],
)
go_repository(
Expand Down
3 changes: 3 additions & 0 deletions OWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ approvers:
- winoros
- WizardXiao
- wjhuang2016
- wk989898
- wshwsh12
- xhebox
- xiongjiwei
Expand Down Expand Up @@ -127,6 +128,7 @@ reviewers:
- dhysum
- fengou1
- fzzf678
- ghazalfamilyusa
- iamxy
- JmPotato
- js00070
Expand All @@ -145,6 +147,7 @@ reviewers:
- shihongzhi
- spongedu
- tangwz
- terry1purcell
- Tjianke
- TonsnakeLin
- tsthght
Expand Down
10 changes: 9 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ go_library(
"//pkg/util/redact",
"//pkg/util/table-filter",
"@com_github_fatih_color//:color",
"@com_github_gogo_protobuf//proto",
"@com_github_opentracing_opentracing_go//:opentracing-go",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down Expand Up @@ -82,9 +83,10 @@ go_test(
],
embed = [":log_client"],
flaky = True,
shard_count = 40,
shard_count = 41,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
"//br/pkg/gluetidb",
"//br/pkg/mock",
"//br/pkg/restore/internal/import_client",
Expand All @@ -97,10 +99,16 @@ go_test(
"//br/pkg/utiltest",
"//pkg/domain",
"//pkg/kv",
"//pkg/planner/core/resolve",
"//pkg/session",
"//pkg/sessionctx",
"//pkg/store/pdtypes",
"//pkg/tablecodec",
"//pkg/testkit",
"//pkg/testkit/testsetup",
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
90 changes: 62 additions & 28 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/fatih/color"
"github.com/gogo/protobuf/proto"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -95,6 +96,8 @@ type LogClient struct {
// Can not use `restoreTS` directly, because schema created in `full backup` maybe is new than `restoreTS`.
currentTS uint64

upstreamClusterID uint64

*LogFileManager

workerPool *tidbutil.WorkerPool
Expand Down Expand Up @@ -167,6 +170,11 @@ func (rc *LogClient) SetConcurrency(c uint) {
rc.workerPool = tidbutil.NewWorkerPool(c, "file")
}

func (rc *LogClient) SetUpstreamClusterID(upstreamClusterID uint64) {
log.Info("upstream cluster id", zap.Uint64("cluster id", upstreamClusterID))
rc.upstreamClusterID = upstreamClusterID
}

func (rc *LogClient) SetStorage(ctx context.Context, backend *backuppb.StorageBackend, opts *storage.ExternalStorageOptions) error {
var err error
rc.storage, err = storage.New(ctx, backend, opts)
Expand Down Expand Up @@ -558,24 +566,38 @@ func (rc *LogClient) RestoreKVFiles(

func (rc *LogClient) initSchemasMap(
ctx context.Context,
clusterID uint64,
restoreTS uint64,
) ([]*backuppb.PitrDBMap, error) {
filename := metautil.PitrIDMapsFilename(clusterID, restoreTS)
exist, err := rc.storage.FileExists(ctx, filename)
if err != nil {
return nil, errors.Annotatef(err, "failed to check filename:%s ", filename)
} else if !exist {
log.Info("pitr id maps isn't existed", zap.String("file", filename))
getPitrIDMapSQL := "SELECT segment_id, id_map FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %? ORDER BY segment_id;"
execCtx := rc.se.GetSessionCtx().GetRestrictedSQLExecutor()
rows, _, errSQL := execCtx.ExecRestrictedSQL(
kv.WithInternalSourceType(ctx, kv.InternalTxnBR),
nil,
getPitrIDMapSQL,
restoreTS,
rc.upstreamClusterID,
)
if errSQL != nil {
return nil, errors.Annotatef(errSQL, "failed to get pitr id map from mysql.tidb_pitr_id_map")
}
if len(rows) == 0 {
log.Info("pitr id map does not exist", zap.Uint64("restored ts", restoreTS))
return nil, nil
}

metaData, err := rc.storage.ReadFile(ctx, filename)
if err != nil {
return nil, errors.Trace(err)
metaData := make([]byte, 0, len(rows)*PITRIdMapBlockSize)
for i, row := range rows {
elementID := row.GetUint64(0)
if uint64(i) != elementID {
return nil, errors.Errorf("the part(segment_id = %d) of pitr id map is lost", i)
}
d := row.GetBytes(1)
if len(d) == 0 {
return nil, errors.Errorf("get the empty part(segment_id = %d) of pitr id map", i)
}
metaData = append(metaData, d...)
}
backupMeta := &backuppb.BackupMeta{}
if err = backupMeta.Unmarshal(metaData); err != nil {
if err := backupMeta.Unmarshal(metaData); err != nil {
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -722,7 +744,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if !cfg.IsNewTask {
log.Info("try to load pitr id maps")
needConstructIdMap = false
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.restoreTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.restoreTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -733,7 +755,7 @@ func (rc *LogClient) InitSchemasReplaceForDDL(
if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil {
log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS))
needConstructIdMap = true
dbMaps, err = rc.initSchemasMap(ctx, rc.GetClusterID(ctx), rc.startTS)
dbMaps, err = rc.initSchemasMap(ctx, rc.startTS)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -887,7 +909,7 @@ func (rc *LogClient) PreConstructAndSaveIDMap(
return errors.Trace(err)
}

if err := rc.SaveIDMap(ctx, sr); err != nil {
if err := rc.saveIDMap(ctx, sr); err != nil {
return errors.Trace(err)
}
return nil
Expand Down Expand Up @@ -1491,24 +1513,36 @@ func (rc *LogClient) GetGCRows() []*stream.PreDelRangeQuery {
return rc.deleteRangeQuery
}

// SaveIDMap saves the id mapping information.
func (rc *LogClient) SaveIDMap(
const PITRIdMapBlockSize int = 524288

// saveIDMap saves the id mapping information.
func (rc *LogClient) saveIDMap(
ctx context.Context,
sr *stream.SchemasReplace,
) error {
idMaps := sr.TidySchemaMaps()
clusterID := rc.GetClusterID(ctx)
metaFileName := metautil.PitrIDMapsFilename(clusterID, rc.restoreTS)
metaWriter := metautil.NewMetaWriter(rc.storage, metautil.MetaFileSize, false, metaFileName, nil)
metaWriter.Update(func(m *backuppb.BackupMeta) {
// save log startTS to backupmeta file
m.ClusterId = clusterID
m.DbMaps = idMaps
})

if err := metaWriter.FlushBackupMeta(ctx); err != nil {
backupmeta := &backuppb.BackupMeta{DbMaps: sr.TidySchemaMaps()}
data, err := proto.Marshal(backupmeta)
if err != nil {
return errors.Trace(err)
}
// clean the dirty id map at first
err = rc.se.ExecuteInternal(ctx, "DELETE FROM mysql.tidb_pitr_id_map WHERE restored_ts = %? and upstream_cluster_id = %?;", rc.restoreTS, rc.upstreamClusterID)
if err != nil {
return errors.Trace(err)
}
replacePitrIDMapSQL := "REPLACE INTO mysql.tidb_pitr_id_map (restored_ts, upstream_cluster_id, segment_id, id_map) VALUES (%?, %?, %?, %?);"
for startIdx, segmentId := 0, 0; startIdx < len(data); segmentId += 1 {
endIdx := startIdx + PITRIdMapBlockSize
if endIdx > len(data) {
endIdx = len(data)
}
err := rc.se.ExecuteInternal(ctx, replacePitrIDMapSQL, rc.restoreTS, rc.upstreamClusterID, segmentId, data[startIdx:endIdx])
if err != nil {
return errors.Trace(err)
}
startIdx = endIdx
}

if rc.useCheckpoint {
var items map[int64]model.TiFlashReplicaInfo
if sr.TiflashRecorder != nil {
Expand Down
Loading

0 comments on commit a1d1233

Please sign in to comment.