Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: decouple executor part out from ddl #54858

Merged
merged 7 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (gs *tidbSession) CreateDatabase(ctx context.Context, schema *model.DBInfo)

// CreatePlacementPolicy implements glue.Session.
func (gs *tidbSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
d := domain.GetDomain(gs.se).DDL()
d := domain.GetDomain(gs.se).DDLExecutor()
gs.se.SetValue(sessionctx.QueryString, gs.showCreatePlacementPolicy(policy))
// the default behaviour is ignoring duplicated policy during restore.
return d.CreatePlacementPolicyWithInfo(gs.se, policy, ddl.OnExistIgnore)
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ go_library(
"constraint.go",
"ddl.go",
"ddl_algorithm.go",
"ddl_api.go",
"ddl_history.go",
"ddl_running_jobs.go",
"ddl_tiflash_api.go",
Expand All @@ -38,6 +37,7 @@ go_library(
"delete_range_util.go",
"dist_owner.go",
"doc.go",
"executor.go",
"foreign_key.go",
"generated_column.go",
"index.go",
Expand Down Expand Up @@ -220,7 +220,6 @@ go_test(
"db_table_test.go",
"db_test.go",
"ddl_algorithm_test.go",
"ddl_api_test.go",
"ddl_error_test.go",
"ddl_history_test.go",
"ddl_running_jobs_test.go",
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *backfillDistExecutor) newBackfillSubtaskExecutor(

// TODO getTableByTxn is using DDL ctx which is never cancelled except when shutdown.
// we should move this operation out of GetStepExecutor, and put into Init.
_, tblIface, err := ddlObj.getTableByTxn((*asAutoIDRequirement)(ddlObj.ddlCtx), jobMeta.SchemaID, jobMeta.TableID)
_, tblIface, err := ddlObj.getTableByTxn(ddlObj.ddlCtx.getAutoIDRequirement(), jobMeta.SchemaID, jobMeta.TableID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func generateNonPartitionPlan(
useCloud bool,
instanceCnt int,
) (metas [][]byte, err error) {
tbl, err := getTable((*asAutoIDRequirement)(d.ddlCtx), job.SchemaID, tblInfo)
tbl, err := getTable(d.ddlCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
if err != nil {
return nil, err
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ func (w *worker) doModifyColumnTypeWithData(
job.SnapshotVer = 0
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
tbl, err := getTable((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo)
tbl, err := getTable(d.getAutoIDRequirement(), dbInfo.ID, tblInfo)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -1672,7 +1672,17 @@ func checkNewAutoRandomBits(idAccessors meta.AutoIDAccessors, oldCol *model.Colu
return nil
}

type asAutoIDRequirement ddlCtx
func (d *ddlCtx) getAutoIDRequirement() autoid.Requirement {
return &asAutoIDRequirement{
store: d.store,
autoidCli: d.autoidCli,
}
}

type asAutoIDRequirement struct {
store kv.Storage
autoidCli *autoid.ClientDiscover
}

var _ autoid.Requirement = &asAutoIDRequirement{}

Expand All @@ -1693,7 +1703,7 @@ func applyNewAutoRandomBits(d *ddlCtx, m *meta.Meta, dbInfo *model.DBInfo,
if !needMigrateFromAutoIncToAutoRand {
return nil
}
autoRandAlloc := autoid.NewAllocatorsFromTblInfo((*asAutoIDRequirement)(d), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
autoRandAlloc := autoid.NewAllocatorsFromTblInfo(d.getAutoIDRequirement(), dbInfo.ID, tblInfo).Get(autoid.AutoRandomType)
if autoRandAlloc == nil {
errMsg := fmt.Sprintf(autoid.AutoRandomAllocatorNotFound, dbInfo.Name.O, tblInfo.Name.O)
return dbterror.ErrInvalidAutoRandom.GenWithStackByArgs(errMsg)
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_change_failpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestParallelUpdateTableReplica(t *testing.T) {
var wg util.WaitGroupWrapper
wg.Run(func() {
// Mock for table tiflash replica was available.
err1 = domain.GetDomain(tk1.Session()).DDL().UpdateTableReplicaInfo(tk1.Session(), t1.Meta().ID, true)
err1 = domain.GetDomain(tk1.Session()).DDLExecutor().UpdateTableReplicaInfo(tk1.Session(), t1.Meta().ID, true)
})
wg.Run(func() {
<-ch
// Mock for table tiflash replica was available.
err2 = domain.GetDomain(tk2.Session()).DDL().UpdateTableReplicaInfo(tk2.Session(), t1.Meta().ID, true)
err2 = domain.GetDomain(tk2.Session()).DDLExecutor().UpdateTableReplicaInfo(tk2.Session(), t1.Meta().ID, true)
})
wg.Wait()
require.NoError(t, err1)
Expand Down
10 changes: 3 additions & 7 deletions pkg/ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
Expand All @@ -39,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/external"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1527,24 +1527,20 @@ func TestParallelDDLBeforeRunDDLJob(t *testing.T) {
sessionToStart.Add(2)
firstDDLFinished := make(chan struct{})

intercept.OnGetInfoSchemaExported = func(ctx sessionctx.Context, is infoschema.InfoSchema) infoschema.InfoSchema {
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterGetSchemaAndTableByIdent", func(ctx sessionctx.Context) {
// The following code is for testing.
// Make sure the two sessions get the same information schema before executing DDL.
// After the first session executes its DDL, then the second session executes its DDL.
var info infoschema.InfoSchema
sessionToStart.Done()
sessionToStart.Wait()
info = is

// Make sure the two session have got the same information schema. And the first session can continue to go on,
// or the first session finished this SQL(seCnt = finishedCnt), then other sessions can continue to go on.
currID := ctx.GetSessionVars().ConnectionID
if currID != 1 {
<-firstDDLFinished
}

return info
}
})
d := dom.DDL()
d.(ddl.DDLForTest).SetInterceptor(intercept)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/db_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestCreateTableWithInfo(t *testing.T) {
tk.MustExec("use test")
tk.Session().SetValue(sessionctx.QueryString, "skip")

d := dom.DDL()
d := dom.DDLExecutor()
require.NotNil(t, d)
info := []*model.TableInfo{{
ID: 42042, // Note, we must ensure the table ID is globally unique!
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestBatchCreateTable(t *testing.T) {
tk.MustExec("drop table if exists tables_2")
tk.MustExec("drop table if exists tables_3")

d := dom.DDL()
d := dom.DDLExecutor()
infos := []*model.TableInfo{}
infos = append(infos, &model.TableInfo{
Name: model.NewCIStr("tables_1"),
Expand Down
Loading