Skip to content

Commit

Permalink
ddl: decouple job seq number from job history & reset its allocator o…
Browse files Browse the repository at this point in the history
…n owner change (#54774)

ref #54436
  • Loading branch information
D3Hunter authored Jul 23, 2024
1 parent d662428 commit cdd7c9e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 63 deletions.
22 changes: 18 additions & 4 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/collate"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
Expand Down Expand Up @@ -2944,10 +2945,11 @@ func TestDDLLastInfo(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test;`)
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows("\"\" 0"))
lastDDLSQL := "select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')"
tk.MustQuery(lastDDLSQL).Check(testkit.Rows("\"\" 0"))
tk.MustExec("create table t(a int)")
firstSequence := 0
res := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')")
res := tk.MustQuery(lastDDLSQL)
require.Len(t, res.Rows(), 1)
require.Equal(t, "\"create table t(a int)\"", res.Rows()[0][0])
var err error
Expand All @@ -2957,10 +2959,22 @@ func TestDDLLastInfo(t *testing.T) {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(`use test;`)
tk.MustExec("create table t2(a int)")
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1)))
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1)))

tk.MustExec("drop table t, t2")
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3)))
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3)))

// owner change, sequence will be reset
ch := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterSchedulerClose", func() {
close(ch)
})
dom, err := session.GetDomain(store)
require.NoError(t, err)
require.NoError(t, dom.DDL().OwnerManager().ResignOwner(context.Background()))
<-ch
tk.MustExec("create table t(a int)")
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf(`"create table t(a int)" %d`, 1)))
}

func TestDefaultCollationForUTF8MB4(t *testing.T) {
Expand Down
21 changes: 2 additions & 19 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,12 +428,6 @@ type ddlCtx struct {
hook Callback
interceptor Interceptor
}

// TODO merge with *waitSchemaSyncedController into another new struct.
ddlSeqNumMu struct {
sync.Mutex
seqNum uint64
}
}

// the schema synchronization mechanism now requires strict incremental schema versions.
Expand Down Expand Up @@ -832,6 +826,7 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
}

func (d *ddl) prepareLocalModeWorkers() {
var idAllocator atomic.Uint64
workerFactory := func(tp workerType) func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx)
Expand All @@ -841,6 +836,7 @@ func (d *ddl) prepareLocalModeWorkers() {
}
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
wk.seqAllocator = &idAllocator
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
}
Expand Down Expand Up @@ -969,19 +965,6 @@ func (d *ddl) DisableDDL() error {
return nil
}

// GetNextDDLSeqNum return the next DDL seq num.
func (s *jobScheduler) GetNextDDLSeqNum() (uint64, error) {
var count uint64
ctx := kv.WithInternalSourceType(s.schCtx, kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
count, err = t.GetHistoryDDLCount()
return err
})
return count, err
}

func (d *ddl) close() {
if d.ctx.Err() != nil {
return
Expand Down
29 changes: 2 additions & 27 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type worker struct {
sess *sess.Session // sess is used and only used in running DDL job.
delRangeManager delRangeManager
logCtx context.Context
seqNumLocked bool
seqAllocator *atomic.Uint64

*ddlCtx
}
Expand Down Expand Up @@ -942,20 +942,13 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
// Notice: warnings is used to support non-strict mode.
updateRawArgs = false
}
w.writeDDLSeqNum(job)
job.SeqNum = w.seqAllocator.Add(1)
w.removeJobCtx(job)
failpoint.InjectCall("afterFinishDDLJob", job)
err = AddHistoryDDLJob(w.ctx, w.sess, t, job, updateRawArgs)
return errors.Trace(err)
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
w.seqNumLocked = true
job.SeqNum = w.ddlSeqNumMu.seqNum
}

func finishRecoverTable(w *worker, job *model.Job) error {
var (
recoverInfo *RecoverInfo
Expand Down Expand Up @@ -1006,17 +999,6 @@ func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) {
}
}

func (w *worker) unlockSeqNum(err error) {
if w.seqNumLocked {
if err != nil {
// if meet error, we should reset seqNum.
w.ddlSeqNumMu.seqNum--
}
w.seqNumLocked = false
w.ddlSeqNumMu.Unlock()
}
}

// DDLBackfillers contains the DDL need backfill step.
var DDLBackfillers = map[model.ActionType]string{
model.ActionAddIndex: "add_index",
Expand Down Expand Up @@ -1099,9 +1081,6 @@ func (w *worker) transitOneJobStep(d *ddlCtx, job *model.Job) (int64, error) {
var (
err error
)
defer func() {
w.unlockSeqNum(err)
}()

txn, err := w.prepareTxn(job)
if err != nil {
Expand Down Expand Up @@ -1229,10 +1208,6 @@ func (w *worker) checkBeforeCommit() error {
// 2. no need to wait schema version(only support create table now).
// 3. no register mdl info(only support create table now).
func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) {
defer func() {
w.unlockSeqNum(err)
}()

txn, err := w.prepareTxn(job)
if err != nil {
return err
Expand Down
14 changes: 5 additions & 9 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/ngaut/pools"
Expand Down Expand Up @@ -132,9 +133,10 @@ type jobScheduler struct {
schemaLoader SchemaLoader
minJobIDRefresher *systable.MinJobIDRefresher

// those fields are created on start
// those fields are created or initialized on start
reorgWorkerPool *workerPool
generalDDLWorkerPool *workerPool
seqAllocator atomic.Uint64

// those fields are shared with 'ddl' instance
// TODO ddlCtx is too large for here, we should remove dependency on it.
Expand All @@ -145,21 +147,14 @@ type jobScheduler struct {
}

func (s *jobScheduler) start() {
var err error
s.ddlCtx.ddlSeqNumMu.Lock()
defer s.ddlCtx.ddlSeqNumMu.Unlock()
s.ddlCtx.ddlSeqNumMu.seqNum, err = s.GetNextDDLSeqNum()
if err != nil {
logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err))
}

workerFactory := func(tp workerType) func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(s.schCtx, tp, s.sessPool, s.delRangeMgr, s.ddlCtx)
sessForJob, err := s.sessPool.Get()
if err != nil {
return nil, err
}
wk.seqAllocator = &s.seqAllocator
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
Expand All @@ -185,6 +180,7 @@ func (s *jobScheduler) close() {
if s.generalDDLWorkerPool != nil {
s.generalDDLWorkerPool.close()
}
failpoint.InjectCall("afterSchedulerClose")
}

func hasSysDB(job *model.Job) bool {
Expand Down
13 changes: 9 additions & 4 deletions pkg/parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,15 @@ type Job struct {
// Priority is only used to set the operation priority of adding indices.
Priority int `json:"priority"`

// SeqNum is the total order in all DDLs, it's used to identify the order of
// moving the job into DDL history, not the order of the job execution.
// fast create table doesn't honor this field, there might duplicate seq_num in this case.
// TODO: deprecated it, as it forces 'moving jobs into DDL history' part to be serial.
// SeqNum is used to identify the order of moving the job into DDL history, it's
// not the order of the job execution. for jobs with dependency, or if they are
// run in the same session, their SeqNum will be in increasing order.
// when using fast create table, there might duplicate seq_num as any TiDB can
// execute the DDL in this case.
// since 8.3, we only honor previous semantic when DDL owner not changed, on
// owner change, new owner will start it from 1. as previous semantic forces
// 'moving jobs into DDL history' part to be serial, it hurts performance, and
// has very limited usage scenario.
SeqNum uint64 `json:"seq_num"`

// Charset is the charset when the DDL Job is created.
Expand Down

0 comments on commit cdd7c9e

Please sign in to comment.