Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: decouple job seq number from job history & reset its allocator on owner change #54774

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions pkg/ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/collate"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
Expand Down Expand Up @@ -2944,10 +2945,11 @@ func TestDDLLastInfo(t *testing.T) {

tk := testkit.NewTestKit(t, store)
tk.MustExec(`use test;`)
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows("\"\" 0"))
lastDDLSQL := "select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')"
tk.MustQuery(lastDDLSQL).Check(testkit.Rows("\"\" 0"))
tk.MustExec("create table t(a int)")
firstSequence := 0
res := tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')")
res := tk.MustQuery(lastDDLSQL)
require.Len(t, res.Rows(), 1)
require.Equal(t, "\"create table t(a int)\"", res.Rows()[0][0])
var err error
Expand All @@ -2957,10 +2959,22 @@ func TestDDLLastInfo(t *testing.T) {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec(`use test;`)
tk.MustExec("create table t2(a int)")
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1)))
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"create table t2(a int)\" %d", firstSequence+1)))

tk.MustExec("drop table t, t2")
tk.MustQuery("select json_extract(@@tidb_last_ddl_info, '$.query'), json_extract(@@tidb_last_ddl_info, '$.seq_num')").Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3)))
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf("\"drop table t, t2\" %d", firstSequence+3)))

// owner change, sequence will be reset
ch := make(chan struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterSchedulerClose", func() {
close(ch)
})
dom, err := session.GetDomain(store)
require.NoError(t, err)
require.NoError(t, dom.DDL().OwnerManager().ResignOwner(context.Background()))
<-ch
tk.MustExec("create table t(a int)")
tk.MustQuery(lastDDLSQL).Check(testkit.Rows(fmt.Sprintf(`"create table t(a int)" %d`, 1)))
}

func TestDefaultCollationForUTF8MB4(t *testing.T) {
Expand Down
21 changes: 2 additions & 19 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,6 @@ type ddlCtx struct {
hook Callback
interceptor Interceptor
}

// TODO merge with *waitSchemaSyncedController into another new struct.
ddlSeqNumMu struct {
sync.Mutex
seqNum uint64
}
}

// the schema synchronization mechanism now requires strict incremental schema versions.
Expand Down Expand Up @@ -827,6 +821,7 @@ func (d *ddl) newDeleteRangeManager(mock bool) delRangeManager {
}

func (d *ddl) prepareLocalModeWorkers() {
var idAllocator atomic.Uint64
workerFactory := func(tp workerType) func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(d.ctx, tp, d.sessPool, d.delRangeMgr, d.ddlCtx)
Expand All @@ -836,6 +831,7 @@ func (d *ddl) prepareLocalModeWorkers() {
}
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
wk.seqAllocator = &idAllocator
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
return wk, nil
}
Expand Down Expand Up @@ -959,19 +955,6 @@ func (d *ddl) DisableDDL() error {
return nil
}

// GetNextDDLSeqNum return the next DDL seq num.
func (s *jobScheduler) GetNextDDLSeqNum() (uint64, error) {
var count uint64
ctx := kv.WithInternalSourceType(s.schCtx, kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
count, err = t.GetHistoryDDLCount()
return err
})
return count, err
}

func (d *ddl) close() {
if d.ctx.Err() != nil {
return
Expand Down
29 changes: 2 additions & 27 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ type worker struct {
sess *sess.Session // sess is used and only used in running DDL job.
delRangeManager delRangeManager
logCtx context.Context
seqNumLocked bool
seqAllocator *atomic.Uint64

*ddlCtx
}
Expand Down Expand Up @@ -954,20 +954,13 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
// Notice: warnings is used to support non-strict mode.
updateRawArgs = false
}
w.writeDDLSeqNum(job)
job.SeqNum = w.seqAllocator.Add(1)
w.removeJobCtx(job)
failpoint.InjectCall("afterFinishDDLJob", job)
err = AddHistoryDDLJob(w.ctx, w.sess, t, job, updateRawArgs)
return errors.Trace(err)
}

func (w *worker) writeDDLSeqNum(job *model.Job) {
w.ddlSeqNumMu.Lock()
w.ddlSeqNumMu.seqNum++
w.seqNumLocked = true
job.SeqNum = w.ddlSeqNumMu.seqNum
}

func finishRecoverTable(w *worker, job *model.Job) error {
var (
recoverInfo *RecoverInfo
Expand Down Expand Up @@ -1018,17 +1011,6 @@ func (w *JobContext) setDDLLabelForTopSQL(jobQuery string) {
}
}

func (w *worker) unlockSeqNum(err error) {
if w.seqNumLocked {
if err != nil {
// if meet error, we should reset seqNum.
w.ddlSeqNumMu.seqNum--
}
w.seqNumLocked = false
w.ddlSeqNumMu.Unlock()
}
}

// DDLBackfillers contains the DDL need backfill step.
var DDLBackfillers = map[model.ActionType]string{
model.ActionAddIndex: "add_index",
Expand Down Expand Up @@ -1108,9 +1090,6 @@ func (w *worker) HandleDDLJobTable(d *ddlCtx, job *model.Job) (int64, error) {
schemaVer int64
runJobErr error
)
defer func() {
w.unlockSeqNum(err)
}()

txn, err := w.prepareTxn(job)
if err != nil {
Expand Down Expand Up @@ -1237,10 +1216,6 @@ func (w *worker) checkBeforeCommit() error {
// 2. no need to wait schema version(only support create table now).
// 3. no register mdl info(only support create table now).
func (w *worker) HandleLocalDDLJob(d *ddlCtx, job *model.Job) (err error) {
defer func() {
w.unlockSeqNum(err)
}()

txn, err := w.prepareTxn(job)
if err != nil {
return err
Expand Down
14 changes: 5 additions & 9 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"slices"
"strconv"
"strings"
"sync/atomic"
"time"

"github.com/ngaut/pools"
Expand Down Expand Up @@ -129,9 +130,10 @@ type jobScheduler struct {
sysTblMgr systable.Manager
schemaLoader SchemaLoader

// those fields are created on start
// those fields are created or initialized on start
reorgWorkerPool *workerPool
generalDDLWorkerPool *workerPool
seqAllocator atomic.Uint64

// those fields are shared with 'ddl' instance
// TODO ddlCtx is too large for here, we should remove dependency on it.
Expand All @@ -142,21 +144,14 @@ type jobScheduler struct {
}

func (s *jobScheduler) start() {
var err error
s.ddlCtx.ddlSeqNumMu.Lock()
defer s.ddlCtx.ddlSeqNumMu.Unlock()
s.ddlCtx.ddlSeqNumMu.seqNum, err = s.GetNextDDLSeqNum()
if err != nil {
logutil.DDLLogger().Error("error when getting the ddl history count", zap.Error(err))
}

workerFactory := func(tp workerType) func() (pools.Resource, error) {
return func() (pools.Resource, error) {
wk := newWorker(s.schCtx, tp, s.sessPool, s.delRangeMgr, s.ddlCtx)
sessForJob, err := s.sessPool.Get()
if err != nil {
return nil, err
}
wk.seqAllocator = &s.seqAllocator
sessForJob.GetSessionVars().SetDiskFullOpt(kvrpcpb.DiskFullOpt_AllowedOnAlmostFull)
wk.sess = sess.NewSession(sessForJob)
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, wk.String())).Inc()
Expand All @@ -182,6 +177,7 @@ func (s *jobScheduler) close() {
if s.generalDDLWorkerPool != nil {
s.generalDDLWorkerPool.close()
}
failpoint.InjectCall("afterSchedulerClose")
}

func hasSysDB(job *model.Job) bool {
Expand Down
13 changes: 9 additions & 4 deletions pkg/parser/model/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,15 @@ type Job struct {
// Priority is only used to set the operation priority of adding indices.
Priority int `json:"priority"`

// SeqNum is the total order in all DDLs, it's used to identify the order of
// moving the job into DDL history, not the order of the job execution.
// fast create table doesn't honor this field, there might duplicate seq_num in this case.
// TODO: deprecated it, as it forces 'moving jobs into DDL history' part to be serial.
// SeqNum is used to identify the order of moving the job into DDL history, it's
// not the order of the job execution. for jobs with dependency, or if they are
// run in the same session, their SeqNum will be in increasing order.
// when using fast create table, there might duplicate seq_num as any TiDB can
// execute the DDL in this case.
// since 8.3, we only honor previous semantic when DDL owner not changed, on
// owner change, new owner will start it from 1. as previous semantic forces
// 'moving jobs into DDL history' part to be serial, it hurts performance, and
// has very limited usage scenario.
SeqNum uint64 `json:"seq_num"`

// Charset is the charset when the DDL Job is created.
Expand Down