Skip to content

Commit

Permalink
ddl: schedule as many jobs as possible in one round, simplify sql to …
Browse files Browse the repository at this point in the history
…query jobs (#54438)

ref #54436
  • Loading branch information
D3Hunter authored Jul 9, 2024
1 parent ef0a3eb commit 8c1ce99
Show file tree
Hide file tree
Showing 20 changed files with 198 additions and 248 deletions.
14 changes: 0 additions & 14 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ type Callback interface {
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
// OnGetJobBefore is called before getting job.
OnGetJobBefore()
// OnGetJobAfter is called after getting job.
OnGetJobAfter(job *model.Job)
}

// BaseCallback implements Callback.OnChanged interface.
Expand Down Expand Up @@ -99,16 +95,6 @@ func (*BaseCallback) OnWatched(_ context.Context) {
// Nothing to do.
}

// OnGetJobBefore implements Callback.OnGetJobBefore interface.
func (*BaseCallback) OnGetJobBefore() {
// Nothing to do.
}

// OnGetJobAfter implements Callback.OnGetJobAfter interface.
func (*BaseCallback) OnGetJobAfter(_ *model.Job) {
// Nothing to do.
}

// OnUpdateReorgInfo implements ReorgCallback interface.
func (*BaseCallback) OnUpdateReorgInfo(_ *model.Job, _ int64) {
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ func cancelSuccess(rs *testkit.Result) bool {

func TestCancel(t *testing.T) {
var enterCnt, exitCnt atomic.Int32
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDelivery2Worker", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) { exitCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeDeliveryJob", func(job *model.Job) { enterCnt.Add(1) })
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) { exitCnt.Add(1) })
waitDDLWorkerExited := func() {
require.Eventually(t, func() bool {
return enterCnt.Load() == exitCnt.Load()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ func TestCancelJobWriteConflict(t *testing.T) {
}

func TestTxnSavepointWithDDL(t *testing.T) {
store, _ := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
store := testkit.CreateMockStoreWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestCreateViewConcurrently(t *testing.T) {
return
}
})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDelivery2Worker", func(job *model.Job) {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterDeliveryJob", func(job *model.Job) {
if job.Type == model.ActionCreateView {
counter--
}
Expand Down
39 changes: 7 additions & 32 deletions pkg/ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
Expand Down Expand Up @@ -75,7 +73,7 @@ func TestAddBatchJobError(t *testing.T) {
}

func TestParallelDDL(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease)
store := testkit.CreateMockStoreWithSchemaLease(t, testLease)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
Expand All @@ -100,12 +98,10 @@ func TestParallelDDL(t *testing.T) {

// set hook to execute jobs after all jobs are in queue.
jobCnt := 11
tc := &callback.TestDDLCallback{Do: dom}
once := sync.Once{}
var checkErr error
tc.OnJobRunBeforeExported = func(job *model.Job) {
// TODO: extract a unified function for other tests.
once.Do(func() {

once1 := sync.Once{}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeLoadAndDeliverJobs", func() {
once1.Do(func() {
for {
tk1 := testkit.NewTestKit(t, store)
tk1.MustExec("begin")
Expand All @@ -120,37 +116,16 @@ func TestParallelDDL(t *testing.T) {
qLen2++
}
}
if checkErr != nil {
break
}
if qLen1+qLen2 == jobCnt {
if qLen2 != 5 {
checkErr = errors.Errorf("add index jobs cnt %v != 6", qLen2)
require.FailNow(t, "add index jobs cnt %v != 6", qLen2)
}
break
}
time.Sleep(5 * time.Millisecond)
}
})
}

once1 := sync.Once{}
tc.OnGetJobBeforeExported = func() {
once1.Do(func() {
for {
tk := testkit.NewTestKit(t, store)
tk.MustExec("begin")
jobs, err := ddl.GetAllDDLJobs(tk.Session())
require.NoError(t, err)
tk.MustExec("rollback")
if len(jobs) == jobCnt {
break
}
time.Sleep(time.Millisecond * 20)
}
})
}
dom.DDL().SetHook(tc)
})

/*
prepare jobs:
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddl/ddl_workerpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ func newDDLWorkerPool(resPool *pools.ResourcePool, tp jobType) *workerPool {
// get gets workerPool from context resource pool.
// Please remember to call put after you finished using workerPool.
func (wp *workerPool) get() (*worker, error) {
if wp.resPool == nil {
return nil, nil
}

if wp.exit.Load() {
return nil, errors.Errorf("workerPool is closed")
}
Expand All @@ -63,7 +59,7 @@ func (wp *workerPool) get() (*worker, error) {

// put returns workerPool to context resource pool.
func (wp *workerPool) put(wk *worker) {
if wp.resPool == nil || wp.exit.Load() {
if wp.exit.Load() {
return
}

Expand All @@ -75,7 +71,7 @@ func (wp *workerPool) put(wk *worker) {
// close clean up the workerPool.
func (wp *workerPool) close() {
// prevent closing resPool twice.
if wp.exit.Load() || wp.resPool == nil {
if wp.exit.Load() {
return
}
wp.exit.Store(true)
Expand All @@ -87,3 +83,7 @@ func (wp *workerPool) close() {
func (wp *workerPool) tp() jobType {
return wp.t
}

func (wp *workerPool) available() int {
return int(wp.resPool.Available())
}
4 changes: 4 additions & 0 deletions pkg/ddl/ddl_workerpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/ngaut/pools"
"github.com/stretchr/testify/require"
)

func TestDDLWorkerPool(t *testing.T) {
Expand All @@ -28,6 +29,9 @@ func TestDDLWorkerPool(t *testing.T) {
}
}
pool := newDDLWorkerPool(pools.NewResourcePool(f(), 1, 2, 0), jobTypeReorg)
require.Equal(t, 1, pool.available())
pool.close()
require.Zero(t, pool.available())
pool.put(nil)
require.Zero(t, pool.available())
}
Loading

0 comments on commit 8c1ce99

Please sign in to comment.