Skip to content

Commit

Permalink
ddl: query since min job id (#54693)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Jul 22, 2024
1 parent 60cc666 commit 0311052
Show file tree
Hide file tree
Showing 41 changed files with 435 additions and 62 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,7 @@ gen_mock: mockgen
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/util/sqlexec RestrictedSQLExecutor > pkg/util/sqlexec/mock/restricted_sql_executor_mock.go
tools/bin/mockgen -package mockstorage github.com/pingcap/tidb/br/pkg/storage ExternalStorage > br/pkg/mock/storage/storage.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl SchemaLoader > pkg/ddl/mock/schema_loader_mock.go
tools/bin/mockgen -package mock github.com/pingcap/tidb/pkg/ddl/systable Manager > pkg/ddl/mock/systable_manager_mock.go

# There is no FreeBSD environment for GitHub actions. So cross-compile on Linux
# but that doesn't work with CGO_ENABLED=1, so disable cgo. The reason to have
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ go_library(
"//pkg/config",
"//pkg/ddl/copr",
"//pkg/ddl/ingest",
"//pkg/ddl/internal/session",
"//pkg/ddl/label",
"//pkg/ddl/logutil",
"//pkg/ddl/placement",
"//pkg/ddl/resourcegroup",
"//pkg/ddl/session",
"//pkg/ddl/syncer",
"//pkg/ddl/systable",
"//pkg/ddl/util",
Expand Down Expand Up @@ -269,11 +269,11 @@ go_test(
"//pkg/config",
"//pkg/ddl/copr",
"//pkg/ddl/ingest",
"//pkg/ddl/internal/session",
"//pkg/ddl/logutil",
"//pkg/ddl/mock",
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/ddl/session",
"//pkg/ddl/syncer",
"//pkg/ddl/testutil",
"//pkg/ddl/util",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/kv"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/copr"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/infoschema"
Expand Down
20 changes: 17 additions & 3 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/syncer"
"github.com/pingcap/tidb/pkg/ddl/systable"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
Expand Down Expand Up @@ -262,6 +263,8 @@ type DDL interface {
GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema
// DoDDLJob does the DDL job, it's exported for test.
DoDDLJob(ctx sessionctx.Context, job *model.Job) error
// GetMinJobIDRefresher gets the MinJobIDRefresher, this api only works after Start.
GetMinJobIDRefresher() *systable.MinJobIDRefresher
// DoDDLJobWrapper similar to DoDDLJob, but with JobWrapper as input.
// exported for testing.
// TODO remove it after decouple components of DDL.
Expand Down Expand Up @@ -314,7 +317,9 @@ type ddl struct {
// used in the concurrency ddl.
localWorkerPool *workerPool
// get notification if any DDL job submitted or finished.
ddlJobNotifyCh chan struct{}
ddlJobNotifyCh chan struct{}
sysTblMgr systable.Manager
minJobIDRefresher *systable.MinJobIDRefresher

// localJobCh is used to delivery job in local TiDB nodes.
localJobCh chan *JobWrapper
Expand Down Expand Up @@ -855,13 +860,18 @@ func (d *ddl) prepareLocalModeWorkers() {
func (d *ddl) Start(ctxPool *pools.ResourcePool) error {
logutil.DDLLogger().Info("start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", config.GetGlobalConfig().Instance.TiDBEnableDDL.Load()))

d.sessPool = sess.NewSessionPool(ctxPool)
d.sysTblMgr = systable.NewManager(d.sessPool)
d.minJobIDRefresher = systable.NewMinJobIDRefresher(d.sysTblMgr)
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobCh, d.addBatchDDLJobsV1)
})
d.wg.Run(func() {
d.limitDDLJobs(d.limitJobChV2, d.addBatchLocalDDLJobs)
})
d.sessPool = sess.NewSessionPool(ctxPool)
d.wg.Run(func() {
d.minJobIDRefresher.Start(d.ctx)
})

d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil)

Expand Down Expand Up @@ -1380,6 +1390,10 @@ func (d *ddl) SetHook(h Callback) {
d.mu.hook = h
}

func (d *ddl) GetMinJobIDRefresher() *systable.MinJobIDRefresher {
return d.minJobIDRefresher
}

func (d *ddl) startCleanDeadTableLock() {
defer func() {
d.wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"strconv"

"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down
8 changes: 4 additions & 4 deletions pkg/ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down Expand Up @@ -411,16 +411,17 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error {
return nil
}

ctx := kv.WithInternalSourceType(d.ctx, kv.InternalTxnDDL)
se, err := d.sessPool.Get()
if err != nil {
return errors.Trace(err)
}
defer d.sessPool.Put(se)
flashClusterJobs, err := getJobsBySQL(sess.NewSession(se), JobTable, fmt.Sprintf("type = %d", model.ActionFlashbackCluster))
found, err := d.sysTblMgr.HasFlashbackClusterJob(ctx, d.minJobIDRefresher.GetCurrMinJobID())
if err != nil {
return errors.Trace(err)
}
if len(flashClusterJobs) != 0 {
if found {
return errors.Errorf("Can't add ddl job, have flashback cluster job")
}

Expand All @@ -429,7 +430,6 @@ func (d *ddl) addBatchDDLJobs(jobWs []*JobWrapper) error {
bdrRole = string(ast.BDRRoleNone)
)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"time"

"github.com/pingcap/tidb/pkg/ddl"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/index_cop.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/distsql"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errctx"
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/config",
"//pkg/ddl/internal/session",
"//pkg/ddl/logutil",
"//pkg/ddl/session",
"//pkg/ddl/util",
"//pkg/kv",
"//pkg/lightning/backend",
Expand Down Expand Up @@ -78,7 +78,7 @@ go_test(
"//pkg/config",
"//pkg/ddl",
"//pkg/ddl/ingest/testutil",
"//pkg/ddl/internal/session",
"//pkg/ddl/session",
"//pkg/ddl/testutil",
"//pkg/ddl/util/callback",
"//pkg/domain",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
Expand Down
33 changes: 18 additions & 15 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/pkg/ddl/ingest"
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/syncer"
"github.com/pingcap/tidb/pkg/ddl/systable"
"github.com/pingcap/tidb/pkg/ddl/util"
Expand Down Expand Up @@ -97,12 +97,14 @@ var _ owner.Listener = (*ownerListener)(nil)

func (l *ownerListener) OnBecomeOwner() {
ctx, cancelFunc := context.WithCancel(l.ddl.ddlCtx.ctx)
sysTblMgr := systable.NewManager(l.ddl.sessPool)
l.scheduler = &jobScheduler{
schCtx: ctx,
cancel: cancelFunc,
runningJobs: newRunningJobs(),
sysTblMgr: systable.NewManager(l.ddl.sessPool),
schemaLoader: l.ddl.schemaLoader,
schCtx: ctx,
cancel: cancelFunc,
runningJobs: newRunningJobs(),
sysTblMgr: sysTblMgr,
schemaLoader: l.ddl.schemaLoader,
minJobIDRefresher: l.ddl.minJobIDRefresher,

ddlCtx: l.ddl.ddlCtx,
ddlJobNotifyCh: l.ddl.ddlJobNotifyCh,
Expand All @@ -122,12 +124,13 @@ func (l *ownerListener) OnRetireOwner() {
// jobScheduler is used to schedule the DDL jobs, it's only run on the DDL owner.
type jobScheduler struct {
// *ddlCtx already have context named as "ctx", so we use "schCtx" here to avoid confusion.
schCtx context.Context
cancel context.CancelFunc
wg tidbutil.WaitGroupWrapper
runningJobs *runningJobs
sysTblMgr systable.Manager
schemaLoader SchemaLoader
schCtx context.Context
cancel context.CancelFunc
wg tidbutil.WaitGroupWrapper
runningJobs *runningJobs
sysTblMgr systable.Manager
schemaLoader SchemaLoader
minJobIDRefresher *systable.MinJobIDRefresher

// those fields are created on start
reorgWorkerPool *workerPool
Expand Down Expand Up @@ -381,12 +384,12 @@ func (s *jobScheduler) loadAndDeliverJobs(se *sess.Session) error {

defer s.runningJobs.resetAllPending()

const getJobSQL = `select reorg, job_meta from mysql.tidb_ddl_job %s order by job_id`
const getJobSQL = `select reorg, job_meta from mysql.tidb_ddl_job where job_id >= %d %s order by job_id`
var whereClause string
if ids := s.runningJobs.allIDs(); len(ids) > 0 {
whereClause = fmt.Sprintf("where job_id not in (%s)", ids)
whereClause = fmt.Sprintf("and job_id not in (%s)", ids)
}
sql := fmt.Sprintf(getJobSQL, whereClause)
sql := fmt.Sprintf(getJobSQL, s.minJobIDRefresher.GetCurrMinJobID(), whereClause)
rows, err := se.Execute(context.Background(), sql, "load_ddl_jobs")
if err != nil {
return errors.Trace(err)
Expand Down
10 changes: 8 additions & 2 deletions pkg/ddl/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "mock",
srcs = ["schema_loader_mock.go"],
srcs = [
"schema_loader_mock.go",
"systable_manager_mock.go",
],
importpath = "github.com/pingcap/tidb/pkg/ddl/mock",
visibility = ["//visibility:public"],
deps = ["@org_uber_go_mock//gomock"],
deps = [
"//pkg/parser/model",
"@org_uber_go_mock//gomock",
],
)
Loading

0 comments on commit 0311052

Please sign in to comment.