Skip to content

Commit

Permalink
ddl: remove interceptor and replace some hook with failpoint (#54882)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Jul 25, 2024
1 parent f838cc8 commit 9897ddd
Show file tree
Hide file tree
Showing 20 changed files with 39 additions and 341 deletions.
50 changes: 0 additions & 50 deletions pkg/ddl/callback.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,66 +15,31 @@
package ddl

import (
"context"
"fmt"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"go.uber.org/zap"
)

// Interceptor is used for DDL.
type Interceptor interface {
// OnGetInfoSchema is an intercept which is called in the function ddl.GetInfoSchema(). It is used in the tests.
OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema
}

// BaseInterceptor implements Interceptor.
type BaseInterceptor struct{}

// OnGetInfoSchema implements Interceptor.OnGetInfoSchema interface.
func (*BaseInterceptor) OnGetInfoSchema(_ sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
return is
}

// Callback is used for DDL.
type Callback interface {
ReorgCallback
// OnChanged is called after a ddl statement is finished.
OnChanged(err error) error
// OnSchemaStateChanged is called after a schema state is changed.
// only called inside tests.
OnSchemaStateChanged(schemaVer int64)
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobRunAfter is called after running job.
OnJobRunAfter(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
// OnWatched is called after watching owner is completed.
OnWatched(ctx context.Context)
}

// BaseCallback implements Callback.OnChanged interface.
type BaseCallback struct {
}

// OnChanged implements Callback interface.
func (*BaseCallback) OnChanged(err error) error {
return err
}

// OnSchemaStateChanged implements Callback interface.
func (*BaseCallback) OnSchemaStateChanged(_ int64) {
// Nothing to do.
}

// OnJobRunBefore implements Callback.OnJobRunBefore interface.
func (*BaseCallback) OnJobRunBefore(_ *model.Job) {
// Nothing to do.
Expand All @@ -90,26 +55,11 @@ func (*BaseCallback) OnJobUpdated(_ *model.Job) {
// Nothing to do.
}

// OnWatched implements Callback.OnWatched interface.
func (*BaseCallback) OnWatched(_ context.Context) {
// Nothing to do.
}

// OnUpdateReorgInfo implements ReorgCallback interface.
func (*BaseCallback) OnUpdateReorgInfo(_ *model.Job, _ int64) {
}

// SchemaLoader is used to avoid import loop, the only impl is domain currently.
type SchemaLoader interface {
Reload() error
}

// ReorgCallback is the callback for DDL reorganization.
type ReorgCallback interface {
// OnUpdateReorgInfo is called after updating reorg info for partitions.
OnUpdateReorgInfo(job *model.Job, pid int64)
}

// ****************************** Start of Customized DDL Callback Instance ****************************************

// DefaultCallback is the default callback that TiDB will use.
Expand Down
12 changes: 0 additions & 12 deletions pkg/ddl/constraint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ func TestAlterConstraintAddDrop(t *testing.T) {

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
_, checkErr = tk1.Exec("insert into t (a, b) values(5,6) ")
}
Expand Down Expand Up @@ -83,13 +81,11 @@ func TestAlterAddConstraintStateChange(t *testing.T) {

var checkErr error
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
onJobUpdatedExportedFunc := func(job *model.Job) {
if checkErr != nil {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
tk1.MustQuery(fmt.Sprintf("select count(1) from `%s`.`%s` where not %s limit 1", "test", "t", "a > 10")).Check(testkit.Rows("0"))
// set constraint state
Expand Down Expand Up @@ -131,11 +127,9 @@ func TestAlterAddConstraintStateChange1(t *testing.T) {
tk1.MustExec("insert into t values(12)")

d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StatNone -> StateWriteOnly
onJobUpdatedExportedFunc1 := func(job *model.Job) {
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteOnly {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
Expand Down Expand Up @@ -171,11 +165,9 @@ func TestAlterAddConstraintStateChange2(t *testing.T) {
tk1.MustExec("insert into t values(12)")

d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteOnly -> StateWriteReorganization
onJobUpdatedExportedFunc2 := func(job *model.Job) {
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
Expand Down Expand Up @@ -210,14 +202,12 @@ func TestAlterAddConstraintStateChange3(t *testing.T) {

addCheckDone := false
d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
if job.Type != model.ActionAddCheckConstraint || job.TableName != "t" {
return
}
originalCallback.OnChanged(nil)
if job.SchemaState == model.StatePublic && job.IsDone() {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
Expand Down Expand Up @@ -258,11 +248,9 @@ func TestAlterEnforcedConstraintStateChange(t *testing.T) {
tk1.MustExec("insert into t values(12)")

d := dom.DDL()
originalCallback := d.GetHook()
callback := &callback.TestDDLCallback{}
// StateWriteReorganization -> StatePublic
onJobUpdatedExportedFunc3 := func(job *model.Job) {
originalCallback.OnChanged(nil)
if job.SchemaState == model.StateWriteReorganization {
// set constraint state
constraintTable := external.GetTableByName(t, tk1, "test", "t")
Expand Down
15 changes: 1 addition & 14 deletions pkg/ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ func TestDropNotNullColumn(t *testing.T) {
if checkErr != nil {
return
}
err := originalCallback.OnChanged(nil)
require.NoError(t, err)
if job.SchemaState == model.StateWriteOnly {
switch sqlNum {
case 0:
Expand Down Expand Up @@ -1508,7 +1506,7 @@ func TestDDLIfExists(t *testing.T) {
// In a cluster, TiDB "a" executes the DDL.
// TiDB "b" fails to load schema, then TiDB "b" executes the DDL statement associated with the DDL statement executed by "a".
func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond)
store := testkit.CreateMockStoreWithSchemaLease(t, 200*time.Millisecond)
tk := testkit.NewTestKit(t, store)
tk.MustExec("create database test_db_state default charset utf8 default collate utf8_bin")
tk.MustExec("use test_db_state")
Expand All @@ -1521,8 +1519,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test_db_state")

intercept := &callback.TestInterceptor{}

var sessionToStart sync.WaitGroup // sessionToStart is a waitgroup to wait for two session to get the same information schema
sessionToStart.Add(2)
firstDDLFinished := make(chan struct{})
Expand All @@ -1541,8 +1537,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
<-firstDDLFinished
}
})
d := dom.DDL()
d.(ddl.DDLForTest).SetInterceptor(intercept)

// Make sure the connection 1 executes a SQL before the connection 2.
// And the connection 2 executes a SQL with an outdated information schema.
Expand All @@ -1559,9 +1553,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
})

wg.Wait()

intercept = &callback.TestInterceptor{}
d.(ddl.DDLForTest).SetInterceptor(intercept)
}

func TestParallelAlterSchemaCharsetAndCollate(t *testing.T) {
Expand Down Expand Up @@ -1657,8 +1648,6 @@ func TestCreateExpressionIndex(t *testing.T) {
if checkErr != nil {
return
}
err := originalCallback.OnChanged(nil)
require.NoError(t, err)
switch job.SchemaState {
case model.StateDeleteOnly:
for _, sql := range stateDeleteOnlySQLs {
Expand Down Expand Up @@ -1839,8 +1828,6 @@ func TestDropExpressionIndex(t *testing.T) {
if checkErr != nil {
return
}
err := originalCallback.OnChanged(nil)
require.NoError(t, err)
switch job.SchemaState {
case model.StateDeleteOnly:
for _, sql := range stateDeleteOnlySQLs {
Expand Down
31 changes: 2 additions & 29 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ type DDL interface {
GetHook() Callback
// SetHook sets the hook.
SetHook(h Callback)
// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing.
GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema
// GetMinJobIDRefresher gets the MinJobIDRefresher, this api only works after Start.
GetMinJobIDRefresher() *systable.MinJobIDRefresher
}
Expand Down Expand Up @@ -357,12 +355,11 @@ type ddlCtx struct {
mu hookStruct
}

// TODO remove it after we remove hook and interceptor.
// TODO remove it after we remove hook.
type hookStruct struct {
sync.RWMutex
// see newDefaultCallBack for its value in normal flow.
hook Callback
interceptor Interceptor
hook Callback
}

// the schema synchronization mechanism now requires strict incremental schema versions.
Expand Down Expand Up @@ -705,7 +702,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) {
ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx)
ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext)
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL)
ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx)
ddlCtx.schemaVersionManager = newSchemaVersionManager(ddlCtx.ctx, opt.EtcdCli)
Expand Down Expand Up @@ -955,17 +951,6 @@ func (d *ddl) GetLease() time.Duration {
return lease
}

// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing.
// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead.
// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever.
func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema {
is := d.infoCache.GetLatest()

d.mu.RLock()
defer d.mu.RUnlock()
return d.mu.interceptor.OnGetInfoSchema(ctx, is)
}

func (e *executor) genGlobalIDs(count int) ([]int64, error) {
var ret []int64
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
Expand Down Expand Up @@ -1295,18 +1280,6 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) err
}
}

func (d *ddl) callHookOnChanged(job *model.Job, err error) error {
if job.State == model.JobStateNone {
// We don't call the hook if the job haven't run yet.
return err
}
d.mu.RLock()
defer d.mu.RUnlock()

err = d.mu.hook.OnChanged(err)
return errors.Trace(err)
}

// SetBinlogClient implements DDL.SetBinlogClient interface.
func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) {
d.binlogCli = binlogCli
Expand Down
10 changes: 0 additions & 10 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,11 @@ import (

// DDLForTest exports for testing.
type DDLForTest interface {
// SetInterceptor sets the interceptor.
SetInterceptor(h Interceptor)
NewReorgCtx(jobID int64, rowCount int64) *reorgCtx
GetReorgCtx(jobID int64) *reorgCtx
RemoveReorgCtx(id int64)
}

// SetInterceptor implements DDL.SetInterceptor interface.
func (d *ddl) SetInterceptor(i Interceptor) {
d.mu.Lock()
defer d.mu.Unlock()

d.mu.interceptor = i
}

// IsReorgCanceled exports for testing.
func (rc *reorgCtx) IsReorgCanceled() bool {
return rc.isReorgCanceled()
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ddl_tiflash_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
pollTiFlashContext.PollCounter++

// Start to process every table.
schema := d.GetInfoSchemaWithInterceptor(ctx)
schema := d.infoCache.GetLatest()
if schema == nil {
return errors.New("Schema is nil")
}
Expand Down
Loading

0 comments on commit 9897ddd

Please sign in to comment.