Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: integrate fast create table into normal general DDL workflow #55025

Merged
merged 7 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down Expand Up @@ -225,6 +224,7 @@ go_test(
"ddl_running_jobs_test.go",
"ddl_test.go",
"ddl_workerpool_test.go",
"executor_nokit_test.go",
"executor_test.go",
"export_test.go",
"fail_test.go",
Expand Down
208 changes: 30 additions & 178 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/pingcap/tidb/pkg/util/generic"
"github.com/tikv/client-go/v2/tikvrpc"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand All @@ -83,7 +82,7 @@ const (

shardRowIDBitsMax = 15

batchAddingJobs = 10
batchAddingJobs = 100

reorgWorkerCnt = 10
generalWorkerCnt = 10
Expand Down Expand Up @@ -199,6 +198,15 @@ type DDL interface {
GetMinJobIDRefresher() *systable.MinJobIDRefresher
}

type jobSubmitResult struct {
err error
jobID int64
// merged indicates whether the job is merged into another job together with
// other jobs. we only merge multiple create table jobs into one job when fast
// create table is enabled.
merged bool
}

// JobWrapper is used to wrap a job and some other information.
// exported for testing.
type JobWrapper struct {
Expand All @@ -207,9 +215,9 @@ type JobWrapper struct {
// exported for test.
IDAllocated bool
// job submission is run in async, we use this channel to notify the caller.
// for local job we might combine multiple jobs into one, append the ErrChs to
// this slice.
ErrChs []chan error
// when fast create table enabled, we might combine multiple jobs into one, and
// append the channel to this slice.
ResultCh []chan jobSubmitResult
cacheErr error
}

Expand All @@ -219,14 +227,19 @@ func NewJobWrapper(job *model.Job, idAllocated bool) *JobWrapper {
return &JobWrapper{
Job: job,
IDAllocated: idAllocated,
ErrChs: []chan error{make(chan error)},
ResultCh: []chan jobSubmitResult{make(chan jobSubmitResult)},
}
}

// NotifyError notifies the error to all error channels.
func (t *JobWrapper) NotifyError(err error) {
for _, errCh := range t.ErrChs {
errCh <- err
// NotifyResult notifies the job submit result.
func (t *JobWrapper) NotifyResult(err error) {
merged := len(t.ResultCh) > 1
for _, resultCh := range t.ResultCh {
resultCh <- jobSubmitResult{
err: err,
jobID: t.ID,
merged: merged,
}
}
}

Expand All @@ -235,8 +248,6 @@ type ddl struct {
m sync.RWMutex
wg tidbutil.WaitGroupWrapper // It's only used to deal with data race in restart_test.
limitJobCh chan *JobWrapper
// limitJobChV2 is used to limit the number of jobs being executed in local worker.
limitJobChV2 chan *JobWrapper

*ddlCtx
sessPool *sess.Pool
Expand Down Expand Up @@ -361,32 +372,17 @@ type hookStruct struct {
hook Callback
}

// the schema synchronization mechanism now requires strict incremental schema versions.
// Therefore, we require a distributed lock to ensure the sequential commit of schema diffs from different TiDB nodes.
type etcdLockInfo struct {
se *concurrency.Session
mu *concurrency.Mutex
}

// schemaVersionManager is used to manage the schema version. To prevent the conflicts on this key between different DDL job,
// we use another transaction to update the schema version, so that we need to lock the schema version and unlock it until the job is committed.
// for version2, we use etcd lock to lock the schema version between TiDB nodes now.
type schemaVersionManager struct {
schemaVersionMu sync.Mutex
// lockOwner stores the job ID that is holding the lock.
lockOwner atomicutil.Int64

ctx context.Context
etcdClient *clientv3.Client
lockInfoMaps map[int64]*etcdLockInfo
}

func newSchemaVersionManager(ctx context.Context, etcdClient *clientv3.Client) *schemaVersionManager {
return &schemaVersionManager{
ctx: ctx,
etcdClient: etcdClient,
lockInfoMaps: make(map[int64]*etcdLockInfo),
}
func newSchemaVersionManager() *schemaVersionManager {
return &schemaVersionManager{}
}

func (sv *schemaVersionManager) setSchemaVersion(job *model.Job, store kv.Storage) (schemaVersion int64, err error) {
Expand All @@ -411,17 +407,6 @@ func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) error {
if ownerID != jobID {
sv.schemaVersionMu.Lock()
sv.lockOwner.Store(jobID)
if sv.etcdClient != nil && variable.EnableFastCreateTable.Load() {
se, err := concurrency.NewSession(sv.etcdClient)
if err != nil {
return errors.Trace(err)
}
mu := concurrency.NewMutex(se, ddlSchemaVersionKeyLock)
if err := mu.Lock(sv.ctx); err != nil {
return errors.Trace(err)
}
sv.lockInfoMaps[jobID] = &etcdLockInfo{se: se, mu: mu}
}
}
return nil
}
Expand All @@ -430,24 +415,6 @@ func (sv *schemaVersionManager) lockSchemaVersion(jobID int64) error {
func (sv *schemaVersionManager) unlockSchemaVersion(jobID int64) {
ownerID := sv.lockOwner.Load()
if ownerID == jobID {
if lockInfo, ok := sv.lockInfoMaps[jobID]; ok {
delete(sv.lockInfoMaps, jobID)
err := lockInfo.mu.Unlock(sv.ctx)
outer:
for err != nil {
logutil.DDLLogger().Error("unlock schema version", zap.Error(err))
select {
case <-sv.ctx.Done():
break outer
case <-time.After(time.Second):
}
// retry unlock
err = lockInfo.mu.Unlock(sv.ctx)
}
if err := lockInfo.se.Close(); err != nil {
logutil.DDLLogger().Error("close etcd session", zap.Error(err))
}
}
sv.lockOwner.Store(0)
sv.schemaVersionMu.Unlock()
}
Expand Down Expand Up @@ -575,11 +542,10 @@ func (dc *ddlCtx) initJobDoneCh(jobID int64) {
}

func (dc *ddlCtx) notifyJobDone(jobID int64) {
if ch, ok := dc.ddlJobDoneChMap.Load(jobID); ok {
select {
case ch <- struct{}{}:
default:
}
if ch, ok := dc.ddlJobDoneChMap.Delete(jobID); ok {
// broadcast done event as we might merge multiple jobs into one when fast
// create table is enabled.
close(ch)
}
}

Expand Down Expand Up @@ -695,12 +661,11 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
ddlCtx.mu.hook = opt.Hook
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
ddlCtx.schemaVersionManager = newSchemaVersionManager(ddlCtx.ctx, opt.EtcdCli)
ddlCtx.schemaVersionManager = newSchemaVersionManager()

d := &ddl{
ddlCtx: ddlCtx,
limitJobCh: make(chan *JobWrapper, batchAddingJobs),
limitJobChV2: make(chan *JobWrapper, batchAddingJobs),
enableTiFlashPoll: atomicutil.NewBool(true),
ddlJobNotifyCh: make(chan struct{}, 100),
localJobCh: make(chan *JobWrapper, 1),
Expand All @@ -721,7 +686,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
variable.EnableDDL = d.EnableDDL
variable.DisableDDL = d.DisableDDL
variable.SwitchMDL = d.SwitchMDL
variable.SwitchFastCreateTable = d.SwitchFastCreateTable

e := &executor{
ctx: d.ctx,
Expand All @@ -734,7 +698,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
schemaLoader: d.schemaLoader,
lease: d.lease,
ownerManager: d.ownerManager,
limitJobChV2: d.limitJobChV2,
ddlJobDoneChMap: &d.ddlJobDoneChMap,
ddlJobNotifyCh: d.ddlJobNotifyCh,
mu: &d.mu,
Expand Down Expand Up @@ -806,9 +769,6 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobCh, d.addBatchDDLJobsV1)
})
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs)
})
d.wg.Run(func() {
d.minJobIDRefresher.Start(d.ctx)
})
Expand Down Expand Up @@ -1065,114 +1025,6 @@ func (d *ddl) SwitchMDL(enable bool) error {
return nil
}

// SwitchFastCreateTable switch fast create table
func (d *ddl) SwitchFastCreateTable(val bool) error {
old := variable.EnableFastCreateTable.Load()
if old == val {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

// Check if there is any DDL running.
// This check can not cover every corner cases, so users need to guarantee that there is no DDL running by themselves.
sessCtx, err := d.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer d.sessPool.Put(sessCtx)
se := sess.NewSession(sessCtx)
rows, err := se.Execute(ctx, "select 1 from mysql.tidb_ddl_job", "check job")
if err != nil {
return errors.Trace(err)
}
if len(rows) != 0 {
return errors.New("please wait for all jobs done")
}

if err := d.switchFastCreateTable(val); err != nil {
return errors.Trace(err)
}

variable.EnableFastCreateTable.Store(val)
logutil.DDLLogger().Info("switch fast create table", zap.Bool("val", val))
return nil
}

// disableFastCreateTable disable fast create table feature.
func (*ddl) disableFastCreateTable(m *meta.Meta) error {
fastCreateTableInitialized, err := m.GetFastCreateTableInitialized()
if err != nil {
return errors.Trace(err)
}
if !fastCreateTableInitialized {
return nil
}
if err := m.ClearAllDatabaseNames(); err != nil {
return errors.Trace(err)
}
if err := m.ClearAllTableNames(); err != nil {
return errors.Trace(err)
}
return errors.Trace(m.SetFastCreateTableInitialized(false))
}

// enableFastCreateTable enable fast create table feature.
func (*ddl) enableFastCreateTable(m *meta.Meta) error {
fastCreateTableInitialized, err := m.GetFastCreateTableInitialized()
if err != nil {
return errors.Trace(err)
}
if fastCreateTableInitialized {
return nil
}

if err := m.ClearAllDatabaseNames(); err != nil {
return errors.Trace(err)
}
if err := m.ClearAllTableNames(); err != nil {
return errors.Trace(err)
}

dbs, err := m.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, dbInfo := range dbs {
if err := m.CreateDatabaseName(dbInfo.Name.L, dbInfo.ID); err != nil {
return errors.Trace(err)
}
}

for _, dbInfo := range dbs {
tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return errors.Trace(err)
}
for _, tableInfo := range tables {
if err := m.CreateTableName(dbInfo.Name.L, tableInfo.Name.L, tableInfo.ID); err != nil {
return errors.Trace(err)
}
}
}

return errors.Trace(m.SetFastCreateTableInitialized(true))
}

func (d *ddl) switchFastCreateTable(val bool) (err error) {
return kv.RunInNewTxn(kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL), d.store, true, func(_ context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)

if val {
err = d.enableFastCreateTable(m)
} else {
err = d.disableFastCreateTable(m)
}
return errors.Trace(err)
})
}

// RecoverInfo contains information needed by DDL.RecoverTable.
type RecoverInfo struct {
SchemaID int64
Expand Down
Loading