diff --git a/pkg/ddl/callback.go b/pkg/ddl/callback.go index fc1ac7d2188e3..7f51a61cc29f8 100644 --- a/pkg/ddl/callback.go +++ b/pkg/ddl/callback.go @@ -15,7 +15,6 @@ package ddl import ( - "context" "fmt" "strings" "time" @@ -23,58 +22,24 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tidb/pkg/ddl/logutil" - "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" "go.uber.org/zap" ) -// Interceptor is used for DDL. -type Interceptor interface { - // OnGetInfoSchema is an intercept which is called in the function ddl.GetInfoSchema(). It is used in the tests. - OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema -} - -// BaseInterceptor implements Interceptor. -type BaseInterceptor struct{} - -// OnGetInfoSchema implements Interceptor.OnGetInfoSchema interface. -func (*BaseInterceptor) OnGetInfoSchema(_ sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema { - return is -} - // Callback is used for DDL. type Callback interface { - ReorgCallback - // OnChanged is called after a ddl statement is finished. - OnChanged(err error) error - // OnSchemaStateChanged is called after a schema state is changed. - // only called inside tests. - OnSchemaStateChanged(schemaVer int64) // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) // OnJobRunAfter is called after running job. OnJobRunAfter(job *model.Job) // OnJobUpdated is called after the running job is updated. OnJobUpdated(job *model.Job) - // OnWatched is called after watching owner is completed. - OnWatched(ctx context.Context) } // BaseCallback implements Callback.OnChanged interface. type BaseCallback struct { } -// OnChanged implements Callback interface. -func (*BaseCallback) OnChanged(err error) error { - return err -} - -// OnSchemaStateChanged implements Callback interface. -func (*BaseCallback) OnSchemaStateChanged(_ int64) { - // Nothing to do. -} - // OnJobRunBefore implements Callback.OnJobRunBefore interface. func (*BaseCallback) OnJobRunBefore(_ *model.Job) { // Nothing to do. @@ -90,26 +55,11 @@ func (*BaseCallback) OnJobUpdated(_ *model.Job) { // Nothing to do. } -// OnWatched implements Callback.OnWatched interface. -func (*BaseCallback) OnWatched(_ context.Context) { - // Nothing to do. -} - -// OnUpdateReorgInfo implements ReorgCallback interface. -func (*BaseCallback) OnUpdateReorgInfo(_ *model.Job, _ int64) { -} - // SchemaLoader is used to avoid import loop, the only impl is domain currently. type SchemaLoader interface { Reload() error } -// ReorgCallback is the callback for DDL reorganization. -type ReorgCallback interface { - // OnUpdateReorgInfo is called after updating reorg info for partitions. - OnUpdateReorgInfo(job *model.Job, pid int64) -} - // ****************************** Start of Customized DDL Callback Instance **************************************** // DefaultCallback is the default callback that TiDB will use. diff --git a/pkg/ddl/constraint_test.go b/pkg/ddl/constraint_test.go index 1992885d2e1a7..96d1f60bb185e 100644 --- a/pkg/ddl/constraint_test.go +++ b/pkg/ddl/constraint_test.go @@ -48,13 +48,11 @@ func TestAlterConstraintAddDrop(t *testing.T) { var checkErr error d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if checkErr != nil { return } - originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteOnly { _, checkErr = tk1.Exec("insert into t (a, b) values(5,6) ") } @@ -83,13 +81,11 @@ func TestAlterAddConstraintStateChange(t *testing.T) { var checkErr error d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} onJobUpdatedExportedFunc := func(job *model.Job) { if checkErr != nil { return } - originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteReorganization { tk1.MustQuery(fmt.Sprintf("select count(1) from `%s`.`%s` where not %s limit 1", "test", "t", "a > 10")).Check(testkit.Rows("0")) // set constraint state @@ -131,11 +127,9 @@ func TestAlterAddConstraintStateChange1(t *testing.T) { tk1.MustExec("insert into t values(12)") d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} // StatNone -> StateWriteOnly onJobUpdatedExportedFunc1 := func(job *model.Job) { - originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteOnly { // set constraint state constraintTable := external.GetTableByName(t, tk1, "test", "t") @@ -171,11 +165,9 @@ func TestAlterAddConstraintStateChange2(t *testing.T) { tk1.MustExec("insert into t values(12)") d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} // StateWriteOnly -> StateWriteReorganization onJobUpdatedExportedFunc2 := func(job *model.Job) { - originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteReorganization { // set constraint state constraintTable := external.GetTableByName(t, tk1, "test", "t") @@ -210,14 +202,12 @@ func TestAlterAddConstraintStateChange3(t *testing.T) { addCheckDone := false d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} // StateWriteReorganization -> StatePublic onJobUpdatedExportedFunc3 := func(job *model.Job) { if job.Type != model.ActionAddCheckConstraint || job.TableName != "t" { return } - originalCallback.OnChanged(nil) if job.SchemaState == model.StatePublic && job.IsDone() { // set constraint state constraintTable := external.GetTableByName(t, tk1, "test", "t") @@ -258,11 +248,9 @@ func TestAlterEnforcedConstraintStateChange(t *testing.T) { tk1.MustExec("insert into t values(12)") d := dom.DDL() - originalCallback := d.GetHook() callback := &callback.TestDDLCallback{} // StateWriteReorganization -> StatePublic onJobUpdatedExportedFunc3 := func(job *model.Job) { - originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteReorganization { // set constraint state constraintTable := external.GetTableByName(t, tk1, "test", "t") diff --git a/pkg/ddl/db_change_test.go b/pkg/ddl/db_change_test.go index 25187fb000040..ddb6709bef06b 100644 --- a/pkg/ddl/db_change_test.go +++ b/pkg/ddl/db_change_test.go @@ -151,8 +151,6 @@ func TestDropNotNullColumn(t *testing.T) { if checkErr != nil { return } - err := originalCallback.OnChanged(nil) - require.NoError(t, err) if job.SchemaState == model.StateWriteOnly { switch sqlNum { case 0: @@ -1508,7 +1506,7 @@ func TestDDLIfExists(t *testing.T) { // In a cluster, TiDB "a" executes the DDL. // TiDB "b" fails to load schema, then TiDB "b" executes the DDL statement associated with the DDL statement executed by "a". func TestParallelDDLBeforeRunDDLJob(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, 200*time.Millisecond) + store := testkit.CreateMockStoreWithSchemaLease(t, 200*time.Millisecond) tk := testkit.NewTestKit(t, store) tk.MustExec("create database test_db_state default charset utf8 default collate utf8_bin") tk.MustExec("use test_db_state") @@ -1521,8 +1519,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) { tk2 := testkit.NewTestKit(t, store) tk2.MustExec("use test_db_state") - intercept := &callback.TestInterceptor{} - var sessionToStart sync.WaitGroup // sessionToStart is a waitgroup to wait for two session to get the same information schema sessionToStart.Add(2) firstDDLFinished := make(chan struct{}) @@ -1541,8 +1537,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) { <-firstDDLFinished } }) - d := dom.DDL() - d.(ddl.DDLForTest).SetInterceptor(intercept) // Make sure the connection 1 executes a SQL before the connection 2. // And the connection 2 executes a SQL with an outdated information schema. @@ -1559,9 +1553,6 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) { }) wg.Wait() - - intercept = &callback.TestInterceptor{} - d.(ddl.DDLForTest).SetInterceptor(intercept) } func TestParallelAlterSchemaCharsetAndCollate(t *testing.T) { @@ -1657,8 +1648,6 @@ func TestCreateExpressionIndex(t *testing.T) { if checkErr != nil { return } - err := originalCallback.OnChanged(nil) - require.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { @@ -1839,8 +1828,6 @@ func TestDropExpressionIndex(t *testing.T) { if checkErr != nil { return } - err := originalCallback.OnChanged(nil) - require.NoError(t, err) switch job.SchemaState { case model.StateDeleteOnly: for _, sql := range stateDeleteOnlySQLs { diff --git a/pkg/ddl/ddl.go b/pkg/ddl/ddl.go index 7e64b11b128e6..67b27238456e6 100644 --- a/pkg/ddl/ddl.go +++ b/pkg/ddl/ddl.go @@ -196,8 +196,6 @@ type DDL interface { GetHook() Callback // SetHook sets the hook. SetHook(h Callback) - // GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing. - GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema // GetMinJobIDRefresher gets the MinJobIDRefresher, this api only works after Start. GetMinJobIDRefresher() *systable.MinJobIDRefresher } @@ -357,12 +355,11 @@ type ddlCtx struct { mu hookStruct } -// TODO remove it after we remove hook and interceptor. +// TODO remove it after we remove hook. type hookStruct struct { sync.RWMutex // see newDefaultCallBack for its value in normal flow. - hook Callback - interceptor Interceptor + hook Callback } // the schema synchronization mechanism now requires strict incremental schema versions. @@ -705,7 +702,6 @@ func newDDL(ctx context.Context, options ...Option) (*ddl, *executor) { ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx) ddlCtx.jobCtx.jobCtxMap = make(map[int64]*JobContext) ddlCtx.mu.hook = opt.Hook - ddlCtx.mu.interceptor = &BaseInterceptor{} ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnDDL) ddlCtx.ctx, ddlCtx.cancel = context.WithCancel(ctx) ddlCtx.schemaVersionManager = newSchemaVersionManager(ddlCtx.ctx, opt.EtcdCli) @@ -955,17 +951,6 @@ func (d *ddl) GetLease() time.Duration { return lease } -// GetInfoSchemaWithInterceptor gets the infoschema binding to d. It's exported for testing. -// Please don't use this function, it is used by TestParallelDDLBeforeRunDDLJob to intercept the calling of d.infoHandle.Get(), use d.infoHandle.Get() instead. -// Otherwise, the TestParallelDDLBeforeRunDDLJob will hang up forever. -func (d *ddl) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema { - is := d.infoCache.GetLatest() - - d.mu.RLock() - defer d.mu.RUnlock() - return d.mu.interceptor.OnGetInfoSchema(ctx, is) -} - func (e *executor) genGlobalIDs(count int) ([]int64, error) { var ret []int64 ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) @@ -1295,18 +1280,6 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) err } } -func (d *ddl) callHookOnChanged(job *model.Job, err error) error { - if job.State == model.JobStateNone { - // We don't call the hook if the job haven't run yet. - return err - } - d.mu.RLock() - defer d.mu.RUnlock() - - err = d.mu.hook.OnChanged(err) - return errors.Trace(err) -} - // SetBinlogClient implements DDL.SetBinlogClient interface. func (d *ddl) SetBinlogClient(binlogCli *pumpcli.PumpsClient) { d.binlogCli = binlogCli diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index 13c0a7284333d..6a88570f222ea 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -38,21 +38,11 @@ import ( // DDLForTest exports for testing. type DDLForTest interface { - // SetInterceptor sets the interceptor. - SetInterceptor(h Interceptor) NewReorgCtx(jobID int64, rowCount int64) *reorgCtx GetReorgCtx(jobID int64) *reorgCtx RemoveReorgCtx(id int64) } -// SetInterceptor implements DDL.SetInterceptor interface. -func (d *ddl) SetInterceptor(i Interceptor) { - d.mu.Lock() - defer d.mu.Unlock() - - d.mu.interceptor = i -} - // IsReorgCanceled exports for testing. func (rc *reorgCtx) IsReorgCanceled() bool { return rc.isReorgCanceled() diff --git a/pkg/ddl/ddl_tiflash_api.go b/pkg/ddl/ddl_tiflash_api.go index 8285a453006bb..9f4c2512f0019 100644 --- a/pkg/ddl/ddl_tiflash_api.go +++ b/pkg/ddl/ddl_tiflash_api.go @@ -441,7 +441,7 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T pollTiFlashContext.PollCounter++ // Start to process every table. - schema := d.GetInfoSchemaWithInterceptor(ctx) + schema := d.infoCache.GetLatest() if schema == nil { return errors.New("Schema is nil") } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index a0537ee74b085..7857bc8e7cfe6 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -335,7 +335,6 @@ func (e *executor) CreateSchemaWithInfo( } err := e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if infoschema.ErrDatabaseExists.Equal(err) && onExist == OnExistIgnore { ctx.GetSessionVars().StmtCtx.AppendNote(err) @@ -377,7 +376,6 @@ func (e *executor) ModifySchemaCharsetAndCollate(ctx sessionctx.Context, stmt *a SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -419,7 +417,6 @@ func (e *executor) ModifySchemaDefaultPlacement(ctx sessionctx.Context, stmt *as }) } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -588,7 +585,6 @@ func (e *executor) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt * SQLMode: sctx.GetSessionVars().SQLMode, } err := e.DoDDLJob(sctx, job) - err = e.callHookOnChanged(job, err) if err != nil { oneFail = tbl.ID fail++ @@ -666,7 +662,6 @@ func (e *executor) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -795,7 +790,6 @@ func (e *executor) DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if err != nil { if infoschema.ErrDatabaseNotExists.Equal(err) { if stmt.IfExists { @@ -845,7 +839,6 @@ func (e *executor) RecoverSchema(ctx sessionctx.Context, recoverSchemaInfo *Reco SQLMode: ctx.GetSessionVars().SQLMode, } err := e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -3079,7 +3072,6 @@ func (e *executor) CreateTableWithInfo( err = e.createTableWithInfoPost(ctx, tbInfo, job.SchemaID) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -3170,16 +3162,16 @@ func (e *executor) BatchCreateTableWithInfo(ctx sessionctx.Context, ctx.GetSessionVars().StmtCtx.AppendNote(err) err = nil } - return errors.Trace(e.callHookOnChanged(jobW.Job, err)) + return errors.Trace(err) } for j := range args { if err = e.createTableWithInfoPost(ctx, args[j], jobW.SchemaID); err != nil { - return errors.Trace(e.callHookOnChanged(jobW.Job, err)) + return errors.Trace(err) } } - return e.callHookOnChanged(jobW.Job, err) + return nil } // BuildQueryStringFromJobs takes a slice of Jobs and concatenates their @@ -3298,7 +3290,6 @@ func (e *executor) CreatePlacementPolicyWithInfo(ctx sessionctx.Context, policy SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -3368,7 +3359,6 @@ func (e *executor) FlashbackCluster(ctx sessionctx.Context, flashbackTS uint64) SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -3412,7 +3402,6 @@ func (e *executor) RecoverTable(ctx sessionctx.Context, recoverInfo *RecoverInfo SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -4396,7 +4385,6 @@ func (e *executor) RebaseAutoID(ctx sessionctx.Context, ident ast.Ident, newBase SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -4450,7 +4438,6 @@ func (e *executor) ShardRowID(ctx sessionctx.Context, tableIdent ast.Ident, uVal SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -4649,7 +4636,6 @@ func (e *executor) AddColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.Alt } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -4748,7 +4734,6 @@ func (e *executor) AddTablePartitions(ctx sessionctx.Context, ident ast.Ident, s ctx.GetSessionVars().StmtCtx.AppendNote(err) return nil } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -4939,7 +4924,6 @@ func (e *executor) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Iden // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if err == nil { ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("The statistics of new partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now")) } @@ -5005,7 +4989,7 @@ func (e *executor) ReorganizePartitions(ctx sessionctx.Context, ident ast.Ident, // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) + failpoint.InjectCall("afterReorganizePartition") if err == nil { ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("The statistics of related partitions will be outdated after reorganizing partitions. Please use 'ANALYZE TABLE' statement if you want to update it now")) } @@ -5071,7 +5055,6 @@ func (e *executor) RemovePartitioning(ctx sessionctx.Context, ident ast.Ident, s // No preSplitAndScatter here, it will be done by the worker in onReorganizePartition instead. err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -5279,7 +5262,6 @@ func (e *executor) TruncateTablePartition(ctx sessionctx.Context, ident ast.Iden } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if err != nil { return errors.Trace(err) } @@ -5383,7 +5365,6 @@ func (e *executor) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, s } return errors.Trace(err) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -5594,8 +5575,7 @@ func (e *executor) ExchangeTablePartition(ctx sessionctx.Context, ident ast.Iden return errors.Trace(err) } ctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("after the exchange, please analyze related table of the exchange to update statistics")) - err = e.callHookOnChanged(job, err) - return errors.Trace(err) + return nil } // DropColumn will drop a column from the table, now we don't support drop the column with index covered. @@ -5633,7 +5613,6 @@ func (e *executor) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.Al } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6474,7 +6453,6 @@ func (e *executor) ChangeColumn(ctx context.Context, sctx sessionctx.Context, id sctx.GetSessionVars().StmtCtx.AppendNote(err) return nil } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6536,7 +6514,6 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6567,7 +6544,6 @@ func (e *executor) ModifyColumn(ctx context.Context, sctx sessionctx.Context, id sctx.GetSessionVars().StmtCtx.AppendNote(err) return nil } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6626,7 +6602,6 @@ func (e *executor) AlterColumn(ctx sessionctx.Context, ident ast.Ident, spec *as } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6660,7 +6635,6 @@ func (e *executor) AlterTableComment(ctx sessionctx.Context, ident ast.Ident, sp } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6689,7 +6663,6 @@ func (e *executor) AlterTableAutoIDCache(ctx sessionctx.Context, ident ast.Ident } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6743,7 +6716,6 @@ func (e *executor) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6815,7 +6787,6 @@ func (e *executor) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6873,7 +6844,6 @@ func (e *executor) AlterTableTTLInfoOrEnable(ctx sessionctx.Context, ident ast.I } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -6906,7 +6876,6 @@ func (e *executor) AlterTableRemoveTTL(ctx sessionctx.Context, ident ast.Ident) SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7042,7 +7011,6 @@ func (e *executor) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int SQLMode: ctx.GetSessionVars().SQLMode, } err := e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7150,7 +7118,6 @@ func (e *executor) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *as } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7295,7 +7262,6 @@ func (e *executor) dropTableObject( } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) { notExistTables = append(notExistTables, fullti.String()) continue @@ -7382,7 +7348,6 @@ func (e *executor) TruncateTable(ctx sessionctx.Context, ti ast.Ident) error { ctx.AddTableLock([]model.TableLockTpInfo{{SchemaID: schema.ID, TableID: newTableID, Tp: tb.Meta().Lock.Tp}}) } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) if err != nil { if config.TableLockEnabled() { ctx.ReleaseTableLockByTableIDs([]int64{newTableID}) @@ -7456,7 +7421,6 @@ func (e *executor) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Id } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7517,7 +7481,6 @@ func (e *executor) renameTables(ctx sessionctx.Context, oldIdents, newIdents []a } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7719,7 +7682,6 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN job.ReorgMeta = reorgMeta err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -7987,7 +7949,6 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast ctx.GetSessionVars().StmtCtx.AppendNote(err) return nil } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8187,7 +8148,6 @@ func (e *executor) CreateForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8217,7 +8177,6 @@ func (e *executor) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName m } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8310,7 +8269,6 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName mod } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8596,7 +8554,6 @@ func (e *executor) LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) ctx.ReleaseTableLocks(unlockTables) ctx.AddTableLock(lockTables) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8644,7 +8601,6 @@ func (e *executor) UnlockTables(ctx sessionctx.Context, unlockTables []model.Tab if err == nil { ctx.ReleaseAllTableLocks() } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8671,7 +8627,6 @@ func (d *ddl) CleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model. } defer d.sessPool.Put(ctx) err = d.executor.DoDDLJob(ctx, job) - err = d.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8743,7 +8698,6 @@ func (e *executor) CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableN if err == nil { ctx.ReleaseTableLocks(cleanupTables) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8833,7 +8787,6 @@ func (e *executor) RepairTable(ctx sessionctx.Context, createStmt *ast.CreateTab // Remove the old TableInfo from repairInfo before domain reload. domainutil.RepairInfo.RemoveFromRepairInfo(oldDBInfo.Name.L, oldTableInfo.Name.L) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8911,7 +8864,6 @@ func (e *executor) AlterSequence(ctx sessionctx.Context, stmt *ast.AlterSequence } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8951,7 +8903,6 @@ func (e *executor) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -8987,7 +8938,6 @@ func (e *executor) AlterTableAttributes(ctx sessionctx.Context, ident ast.Ident, return errors.Trace(err) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9031,7 +8981,6 @@ func (e *executor) AlterTablePartitionAttributes(ctx sessionctx.Context, ident a return errors.Trace(err) } - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9113,7 +9062,6 @@ func (e *executor) AlterTablePartitionPlacement(ctx sessionctx.Context, tableIde } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9277,7 +9225,6 @@ func (e *executor) AddResourceGroup(ctx sessionctx.Context, stmt *ast.CreateReso SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return err } @@ -9328,7 +9275,6 @@ func (e *executor) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResou SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return err } @@ -9385,7 +9331,6 @@ func (e *executor) AlterResourceGroup(ctx sessionctx.Context, stmt *ast.AlterRes SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return err } @@ -9450,7 +9395,6 @@ func (e *executor) DropPlacementPolicy(ctx sessionctx.Context, stmt *ast.DropPla SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9489,7 +9433,6 @@ func (e *executor) AlterPlacementPolicy(ctx sessionctx.Context, stmt *ast.AlterP SQLMode: ctx.GetSessionVars().SQLMode, } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9547,8 +9490,7 @@ func (e *executor) AlterTableCache(sctx sessionctx.Context, ti ast.Ident) (err e SQLMode: sctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(sctx, job) - return e.callHookOnChanged(job, err) + return e.DoDDLJob(sctx, job) } func checkCacheTableSize(store kv.Storage, tableID int64) (bool, error) { @@ -9608,8 +9550,7 @@ func (e *executor) AlterTableNoCache(ctx sessionctx.Context, ti ast.Ident) (err SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) - return e.callHookOnChanged(job, err) + return e.DoDDLJob(ctx, job) } // checkTooBigFieldLengthAndTryAutoConvert will check whether the field length is too big @@ -9702,7 +9643,6 @@ func (e *executor) CreateCheckConstraint(ctx sessionctx.Context, ti ast.Ident, c } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9736,7 +9676,6 @@ func (e *executor) DropCheckConstraint(ctx sessionctx.Context, ti ast.Ident, con } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) return errors.Trace(err) } @@ -9770,20 +9709,6 @@ func (e *executor) AlterCheckConstraint(ctx sessionctx.Context, ti ast.Ident, co } err = e.DoDDLJob(ctx, job) - err = e.callHookOnChanged(job, err) - return errors.Trace(err) -} - -// TODO remove it later. -func (e *executor) callHookOnChanged(job *model.Job, err error) error { - if job.State == model.JobStateNone { - // We don't call the hook if the job haven't run yet. - return err - } - e.mu.RLock() - defer e.mu.RUnlock() - - err = e.mu.hook.OnChanged(err) return errors.Trace(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 3b8265725e5a3..6331a53a2cea4 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1896,9 +1896,6 @@ func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { if err != nil { break } - w.ddlCtx.mu.RLock() - w.ddlCtx.mu.hook.OnUpdateReorgInfo(reorgInfo.Job, reorgInfo.PhysicalTableID) - w.ddlCtx.mu.RUnlock() finish, err = updateReorgInfo(w.sessPool, tbl, reorgInfo) if err != nil { diff --git a/pkg/ddl/job_table.go b/pkg/ddl/job_table.go index bae525c8497b3..71de6aea62406 100644 --- a/pkg/ddl/job_table.go +++ b/pkg/ddl/job_table.go @@ -617,7 +617,7 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, job *model.Job) } }) - failpoint.InjectCall("beforeWaitSchemaChanged", job) + failpoint.InjectCall("beforeWaitSchemaChanged", job, schemaVer) // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // If the job is done or still running or rolling back, we will wait 2 * lease time or util MDL synced to guarantee other servers to update // the newest schema. @@ -627,13 +627,6 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, job *model.Job) s.cleanMDLInfo(job, ownerID) s.synced(job) - if RunInGoTest { - // s.mu.hook is initialed from domain / test callback, which will force the owner host update schema diff synchronously. - s.mu.RLock() - s.mu.hook.OnSchemaStateChanged(schemaVer) - s.mu.RUnlock() - } - s.mu.RLock() s.mu.hook.OnJobUpdated(job) s.mu.RUnlock() diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index a1d753d9d6eea..e2186fdea00f3 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -135,8 +135,7 @@ func (e *executor) MultiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info return errors.Trace(err) } mergeAddIndex(info) - err = e.DoDDLJob(ctx, job) - return e.callHookOnChanged(job, err) + return e.DoDDLJob(ctx, job) } func containsDistTaskSubJob(subJobs []*model.SubJob) bool { diff --git a/pkg/ddl/multi_schema_change_test.go b/pkg/ddl/multi_schema_change_test.go index f80737a9dfdcf..7202d9bb99bec 100644 --- a/pkg/ddl/multi_schema_change_test.go +++ b/pkg/ddl/multi_schema_change_test.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -735,7 +736,7 @@ func TestMultiSchemaChangeNoSubJobs(t *testing.T) { } func TestMultiSchemaChangeSchemaVersion(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) + store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test;") tk.MustExec("create table t(a int, b int, c int, d int)") @@ -743,22 +744,18 @@ func TestMultiSchemaChangeSchemaVersion(t *testing.T) { schemaVerMap := map[int64]struct{}{} - originHook := dom.DDL().GetHook() - hook := &callback.TestDDLCallback{Do: dom} - hook.OnJobSchemaStateChanged = func(schemaVer int64) { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeWaitSchemaChanged", func(_ *model.Job, schemaVer int64) { if schemaVer != 0 { // No same return schemaVer during multi-schema change _, ok := schemaVerMap[schemaVer] assert.False(t, ok) schemaVerMap[schemaVer] = struct{}{} } - } - dom.DDL().SetHook(hook) + }) tk.MustExec("alter table t drop column b, drop column c") tk.MustExec("alter table t add column b int, add column c int") tk.MustExec("alter table t add index k(b), add column e int") tk.MustExec("alter table t alter index k invisible, drop column e") - dom.DDL().SetHook(originHook) } func TestMultiSchemaChangeMixedWithUpdate(t *testing.T) { diff --git a/pkg/ddl/reorg_partition_test.go b/pkg/ddl/reorg_partition_test.go index f06e0303f5381..a39280c4c1727 100644 --- a/pkg/ddl/reorg_partition_test.go +++ b/pkg/ddl/reorg_partition_test.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -142,15 +143,6 @@ func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable type TestReorgDDLCallback struct { *callback.TestDDLCallback - syncChan chan bool -} - -func (tc *TestReorgDDLCallback) OnChanged(err error) error { - err = tc.TestDDLCallback.OnChanged(err) - <-tc.syncChan - // We want to wait here - <-tc.syncChan - return err } func TestReorgPartitionConcurrent(t *testing.T) { @@ -170,8 +162,13 @@ func TestReorgPartitionConcurrent(t *testing.T) { defer dom.DDL().SetHook(originHook) syncOnChanged := make(chan bool) defer close(syncOnChanged) - hook := &TestReorgDDLCallback{TestDDLCallback: &callback.TestDDLCallback{Do: dom}, syncChan: syncOnChanged} + hook := &TestReorgDDLCallback{TestDDLCallback: &callback.TestDDLCallback{Do: dom}} dom.DDL().SetHook(hook) + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterReorganizePartition", func() { + <-syncOnChanged + // We want to wait here + <-syncOnChanged + }) wait := make(chan bool) defer close(wait) diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index fb96d024528ed..f8ffeb46919d1 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -61,13 +61,15 @@ type Checker struct { tracker SchemaTracker closed atomic.Bool realExecutor ddl.Executor + infoCache *infoschema.InfoCache } // NewChecker creates a Checker. -func NewChecker(realDDL ddl.DDL, realExecutor ddl.Executor) *Checker { +func NewChecker(realDDL ddl.DDL, realExecutor ddl.Executor, infoCache *infoschema.InfoCache) *Checker { return &Checker{ realDDL: realDDL, realExecutor: realExecutor, + infoCache: infoCache, tracker: NewSchemaTracker(2), } } @@ -91,7 +93,7 @@ func (d *Checker) checkDBInfo(ctx sessionctx.Context, dbName model.CIStr) { if d.closed.Load() { return } - dbInfo, _ := d.realDDL.GetInfoSchemaWithInterceptor(ctx).SchemaByName(dbName) + dbInfo, _ := d.infoCache.GetLatest().SchemaByName(dbName) dbInfo2 := d.tracker.SchemaByName(dbName) if dbInfo == nil || dbInfo2 == nil { @@ -130,7 +132,7 @@ func (d *Checker) checkTableInfo(ctx sessionctx.Context, dbName, tableName model return } - tableInfo, _ := d.realDDL.GetInfoSchemaWithInterceptor(ctx).TableByName(context.Background(), dbName, tableName) + tableInfo, _ := d.infoCache.GetLatest().TableByName(context.Background(), dbName, tableName) tableInfo2, _ := d.tracker.TableByName(context.Background(), dbName, tableName) if tableInfo == nil || tableInfo2 == nil { @@ -553,11 +555,6 @@ func (d *Checker) SetHook(h ddl.Callback) { d.realDDL.SetHook(h) } -// GetInfoSchemaWithInterceptor implements the DDL interface. -func (d *Checker) GetInfoSchemaWithInterceptor(ctx sessionctx.Context) infoschema.InfoSchema { - return d.realDDL.GetInfoSchemaWithInterceptor(ctx) -} - // DoDDLJob implements the DDL interface. func (d *Checker) DoDDLJob(ctx sessionctx.Context, job *model.Job) error { de := d.realExecutor.(ddl.ExecutorForTest) @@ -584,7 +581,7 @@ type storageAndMore interface { // StorageDDLInjector wraps kv.Storage to inject checker to domain's DDL in bootstrap time. type StorageDDLInjector struct { storageAndMore - Injector func(ddl.DDL, ddl.Executor) *Checker + Injector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *Checker } // NewStorageDDLInjector creates a new StorageDDLInjector to inject Checker. diff --git a/pkg/ddl/tests/fail/fail_db_test.go b/pkg/ddl/tests/fail/fail_db_test.go index 65776b56a0f25..a8c4b3959ce42 100644 --- a/pkg/ddl/tests/fail/fail_db_test.go +++ b/pkg/ddl/tests/fail/fail_db_test.go @@ -91,8 +91,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustQuery("select * from t").Check(testkit.Rows("1")) // Execute ddl statement reload schema tk.MustExec("alter table t comment 'test1'") - err = s.dom.DDL().GetHook().OnChanged(nil) - require.NoError(t, err) tk = testkit.NewTestKit(t, s.store) tk.MustExec("use cancel_job_db") @@ -120,8 +118,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustQuery("select * from tx").Check(testkit.Rows("1")) // Execute ddl statement reload schema. tk.MustExec("alter table tx comment 'tx'") - err = s.dom.DDL().GetHook().OnChanged(nil) - require.NoError(t, err) tk = testkit.NewTestKit(t, s.store) tk.MustExec("use cancel_job_db") @@ -147,8 +143,6 @@ func TestHalfwayCancelOperations(t *testing.T) { tk.MustQuery("select * from nt").Check(testkit.Rows("7")) // Execute ddl statement reload schema. tk.MustExec("alter table pt comment 'pt'") - err = s.dom.DDL().GetHook().OnChanged(nil) - require.NoError(t, err) tk = testkit.NewTestKit(t, s.store) tk.MustExec("use cancel_job_db") diff --git a/pkg/ddl/tests/indexmerge/merge_test.go b/pkg/ddl/tests/indexmerge/merge_test.go index 0b60d4c4a2115..712578aea4a80 100644 --- a/pkg/ddl/tests/indexmerge/merge_test.go +++ b/pkg/ddl/tests/indexmerge/merge_test.go @@ -96,7 +96,7 @@ func TestAddPrimaryKeyMergeProcess(t *testing.T) { var runDML, backfillDone bool // only trigger reload when schema version changed testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/domain/disableOnTickReload", "return(true)") - testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeWaitSchemaChanged", func(job *model.Job) { + testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeWaitSchemaChanged", func(job *model.Job, _ int64) { if !runDML && job.Type == model.ActionAddPrimaryKey && job.SchemaState == model.StateWriteReorganization { idx := testutil.FindIdxInfo(dom, "test", "t", "primary") if idx == nil || idx.BackfillState != model.BackfillStateRunning || job.SnapshotVer == 0 { diff --git a/pkg/ddl/util/callback/BUILD.bazel b/pkg/ddl/util/callback/BUILD.bazel index a5a677b5f9bc2..4ae863b270d46 100644 --- a/pkg/ddl/util/callback/BUILD.bazel +++ b/pkg/ddl/util/callback/BUILD.bazel @@ -8,9 +8,7 @@ go_library( deps = [ "//pkg/ddl", "//pkg/ddl/logutil", - "//pkg/infoschema", "//pkg/parser/model", - "//pkg/sessionctx", "@org_uber_go_zap//:zap", ], ) @@ -21,8 +19,5 @@ go_test( srcs = ["callback_test.go"], embed = [":callback"], flaky = True, - deps = [ - "//pkg/ddl", - "@com_github_stretchr_testify//require", - ], + deps = ["//pkg/ddl"], ) diff --git a/pkg/ddl/util/callback/callback.go b/pkg/ddl/util/callback/callback.go index da4c7d63fffde..addc26e53799e 100644 --- a/pkg/ddl/util/callback/callback.go +++ b/pkg/ddl/util/callback/callback.go @@ -15,33 +15,14 @@ package callback import ( - "context" "sync/atomic" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/logutil" - "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/sessionctx" "go.uber.org/zap" ) -// TestInterceptor is a test interceptor in the ddl -type TestInterceptor struct { - *ddl.BaseInterceptor - - OnGetInfoSchemaExported func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema -} - -// OnGetInfoSchema is to run when to call GetInfoSchema -func (ti *TestInterceptor) OnGetInfoSchema(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema { - if ti.OnGetInfoSchemaExported != nil { - return ti.OnGetInfoSchemaExported(ctx, is) - } - - return ti.BaseInterceptor.OnGetInfoSchema(ctx, is) -} - // TestDDLCallback is used to customize user callback themselves. type TestDDLCallback struct { *ddl.BaseCallback @@ -49,46 +30,9 @@ type TestDDLCallback struct { // domain to reload schema before your ddl stepping into the next state change. Do ddl.SchemaLoader - onJobRunBefore func(*model.Job) - OnJobRunBeforeExported func(*model.Job) - OnJobRunAfterExported func(*model.Job) - onJobUpdated func(*model.Job) - OnJobUpdatedExported atomic.Pointer[func(*model.Job)] - onWatched func(ctx context.Context) - OnGetJobBeforeExported func() - OnGetJobAfterExported func(*model.Job) - OnJobSchemaStateChanged func(int64) - - OnUpdateReorgInfoExported func(job *model.Job, pid int64) -} - -// OnChanged mock the same behavior with the main DDL hook. -func (tc *TestDDLCallback) OnChanged(err error) error { - if err != nil { - return err - } - logutil.DDLLogger().Info("performing DDL change, must reload") - if tc.Do != nil { - err = tc.Do.Reload() - if err != nil { - logutil.DDLLogger().Error("performing DDL change failed", zap.Error(err)) - } - } - return nil -} - -// OnSchemaStateChanged mock the same behavior with the main ddl hook. -func (tc *TestDDLCallback) OnSchemaStateChanged(schemaVer int64) { - if tc.Do != nil { - if err := tc.Do.Reload(); err != nil { - logutil.DDLLogger().Warn("reload failed on schema state changed", zap.Error(err)) - } - } - - if tc.OnJobSchemaStateChanged != nil { - tc.OnJobSchemaStateChanged(schemaVer) - return - } + OnJobRunBeforeExported func(*model.Job) + OnJobRunAfterExported func(*model.Job) + OnJobUpdatedExported atomic.Pointer[func(*model.Job)] } // OnJobRunBefore is used to run the user customized logic of `onJobRunBefore` first. @@ -98,10 +42,6 @@ func (tc *TestDDLCallback) OnJobRunBefore(job *model.Job) { tc.OnJobRunBeforeExported(job) return } - if tc.onJobRunBefore != nil { - tc.onJobRunBefore(job) - return - } tc.BaseCallback.OnJobRunBefore(job) } @@ -127,32 +67,11 @@ func (tc *TestDDLCallback) OnJobUpdated(job *model.Job) { if job.State == model.JobStateSynced { return } - if tc.onJobUpdated != nil { - tc.onJobUpdated(job) - return - } tc.BaseCallback.OnJobUpdated(job) } -// OnWatched is used to run the user customized logic of `OnWatched` first. -func (tc *TestDDLCallback) OnWatched(ctx context.Context) { - if tc.onWatched != nil { - tc.onWatched(ctx) - return - } - - tc.BaseCallback.OnWatched(ctx) -} - // Clone copies the callback and take its reference func (tc *TestDDLCallback) Clone() *TestDDLCallback { return &*tc } - -// OnUpdateReorgInfo mock the same behavior with the main DDL reorg hook. -func (tc *TestDDLCallback) OnUpdateReorgInfo(job *model.Job, pid int64) { - if tc.OnUpdateReorgInfoExported != nil { - tc.OnUpdateReorgInfoExported(job, pid) - } -} diff --git a/pkg/ddl/util/callback/callback_test.go b/pkg/ddl/util/callback/callback_test.go index f1bf6ce3ee15c..d70e9d26a058e 100644 --- a/pkg/ddl/util/callback/callback_test.go +++ b/pkg/ddl/util/callback/callback_test.go @@ -15,17 +15,13 @@ package callback import ( - "context" "testing" "github.com/pingcap/tidb/pkg/ddl" - "github.com/stretchr/testify/require" ) func TestCallback(t *testing.T) { cb := &ddl.BaseCallback{} - require.Nil(t, cb.OnChanged(nil)) cb.OnJobRunBefore(nil) cb.OnJobUpdated(nil) - cb.OnWatched(context.TODO()) } diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 1227451e24008..49866feb5b0fe 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -1264,7 +1264,7 @@ func newEtcdCli(addrs []string, ebd kv.EtcdBackend) (*clientv3.Client, error) { func (do *Domain) Init( ddlLease time.Duration, sysExecutorFactory func(*Domain) (pools.Resource, error), - ddlInjector func(ddl.DDL, ddl.Executor) *schematracker.Checker, + ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker, ) error { do.sysExecutorFactory = sysExecutorFactory perfschema.Init() @@ -1335,7 +1335,7 @@ func (do *Domain) Init( } }) if ddlInjector != nil { - checker := ddlInjector(do.ddl, do.ddlExecutor) + checker := ddlInjector(do.ddl, do.ddlExecutor, do.infoCache) checker.CreateTestDB(nil) do.ddl = checker do.ddlExecutor = checker diff --git a/pkg/session/tidb.go b/pkg/session/tidb.go index 73c1fe0c70e7d..5e5fadbdcf3a8 100644 --- a/pkg/session/tidb.go +++ b/pkg/session/tidb.go @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor" + "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" @@ -86,7 +87,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { sysFactory := createSessionWithDomainFunc(store) d = domain.NewDomain(store, ddlLease, statisticLease, planReplayerGCLease, factory) - var ddlInjector func(ddl.DDL, ddl.Executor) *schematracker.Checker + var ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker if injector, ok := store.(schematracker.StorageDDLInjector); ok { ddlInjector = injector.Injector }