Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refactor the args for the DDL type of renametable #55959

Merged
merged 13 commits into from
Sep 18, 2024
20 changes: 19 additions & 1 deletion pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,18 @@ func TestIgnorableSpec(t *testing.T) {
}

func TestBuildJobDependence(t *testing.T) {
vers := []model.JobVersion{
model.JobVersion1,
model.JobVersion2,
}

for _, ver := range vers {
model.SetJobVerInUse(ver)
joccau marked this conversation as resolved.
Show resolved Hide resolved
testBuildJobDependence(t)
}
}

func testBuildJobDependence(t *testing.T) {
store := createMockStore(t)
defer func() {
require.NoError(t, store.Close())
Expand All @@ -214,7 +226,13 @@ func TestBuildJobDependence(t *testing.T) {
job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
job11 := &model.Job{ID: 11, TableID: 2, Type: model.ActionRenameTable, Version: model.GetJobVerInUse()}
job11.FillArgs(&model.RenameTableArgs{
OldSchemaID: 111,
NewTableName: pmodel.NewCIStr("new_table_name"),
SchemaName: pmodel.NewCIStr("old_db_name"),
})

err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
Expand Down
5 changes: 3 additions & 2 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4263,15 +4263,16 @@ func (e *executor) renameTable(ctx sessionctx.Context, oldIdent, newIdent ast.Id
Type: model.ActionRenameTable,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{schemas[0].ID, newIdent.Name, schemas[0].Name},
CtxVars: []any{[]int64{schemas[0].ID, schemas[1].ID}, []int64{tableID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: schemas[0].Name.L, Table: oldIdent.Name.L},
{Database: schemas[1].Name.L, Table: newIdent.Name.L},
},
SQLMode: ctx.GetSessionVars().SQLMode,
}

args := model.NewRenameTableArgs(schemas[0].ID, schemas[1].ID, schemas[0].Name, tableID, newIdent.Name)
job.FillArgs(args)
joccau marked this conversation as resolved.
Show resolved Hide resolved

err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,11 +703,16 @@ func job2UniqueIDs(job *model.Job, schema bool) string {
switch job.Type {
case model.ActionExchangeTablePartition, model.ActionRenameTables, model.ActionRenameTable:
var ids []int64
if schema {
ids = job.CtxVars[0].([]int64)
if job.Type == model.ActionRenameTable {
ids = model.GetRenameTableUniqueIDs(job, schema)
} else {
ids = job.CtxVars[1].([]int64)
if schema {
ids = job.CtxVars[0].([]int64)
} else {
ids = job.CtxVars[1].([]int64)
}
}

set := make(map[int64]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/ddl/schema_version.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ func SetSchemaDiffForCreateView(diff *model.SchemaDiff, job *model.Job) error {

// SetSchemaDiffForRenameTable set SchemaDiff for ActionRenameTable.
func SetSchemaDiffForRenameTable(diff *model.SchemaDiff, job *model.Job) error {
err := job.DecodeArgs(&diff.OldSchemaID)
args, err := model.GetRenameTableArgs(job)
if err != nil {
return errors.Trace(err)
}

diff.OldSchemaID = args.OldSchemaID
diff.TableID = job.TableID
return nil
}
Expand Down
15 changes: 7 additions & 8 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,10 +746,8 @@ func verifyNoOverflowShardBits(s *sess.Pool, tbl table.Table, shardRowIDBits uin
}

func onRenameTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var oldSchemaID int64
var oldSchemaName pmodel.CIStr
var tableName pmodel.CIStr
if err := job.DecodeArgs(&oldSchemaID, &tableName, &oldSchemaName); err != nil {
args, err := model.GetRenameTableArgs(job)
if err != nil {
// Invalid arguments, cancel this job.
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
Expand All @@ -759,25 +757,26 @@ func onRenameTable(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
return finishJobRenameTable(jobCtx, t, job)
}
newSchemaID := job.SchemaID
err := checkTableNotExists(jobCtx.infoCache, newSchemaID, tableName.L)
err = checkTableNotExists(jobCtx.infoCache, newSchemaID, args.NewTableName.L)
if err != nil {
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableExists.Equal(err) {
job.State = model.JobStateCancelled
}
return ver, errors.Trace(err)
}

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, oldSchemaID)
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, args.OldSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
oldTableName := tblInfo.Name
ver, err = checkAndRenameTables(t, job, tblInfo, oldSchemaID, job.SchemaID, &oldSchemaName, &tableName)
ver, err = checkAndRenameTables(t, job, tblInfo, args.OldSchemaID, job.SchemaID, &args.SchemaName, &args.NewTableName)
if err != nil {
return ver, errors.Trace(err)
}
fkh := newForeignKeyHelper()
err = adjustForeignKeyChildTableInfoAfterRenameTable(jobCtx.infoCache, t, job, &fkh, tblInfo, oldSchemaName, oldTableName, tableName, newSchemaID)
err = adjustForeignKeyChildTableInfoAfterRenameTable(jobCtx.infoCache, t,
job, &fkh, tblInfo, args.SchemaName, oldTableName, args.NewTableName, newSchemaID)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
21 changes: 16 additions & 5 deletions pkg/ddl/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,26 @@ func testRenameTable(
tblInfo *model.TableInfo,
) *model.Job {
job := &model.Job{
SchemaID: newSchemaID,
TableID: tblInfo.ID,
Type: model.ActionRenameTable,
SchemaID: newSchemaID,
TableID: tblInfo.ID,
Type: model.ActionRenameTable,
Version: model.GetJobVerInUse(),

BinlogInfo: &model.HistoryInfo{},
Args: []any{oldSchemaID, tblInfo.Name, oldSchemaName},
CtxVars: []any{[]int64{oldSchemaID, newSchemaID}, []int64{tblInfo.ID}},
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{
{Database: oldSchemaName.L, Table: tblInfo.Name.L},
{Database: newSchemaName.L, Table: tblInfo.Name.L},
},
}
args := model.RenameTableArgs{
OldSchemaID: oldSchemaID,
SchemaName: oldSchemaName,
NewTableName: tblInfo.Name,
SchemaIDs: []int64{oldSchemaID, newSchemaID},
TableIDs: []int64{tblInfo.ID},
}
job.FillArgs(&args)
joccau marked this conversation as resolved.
Show resolved Hide resolved

ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)))

Expand All @@ -76,6 +85,7 @@ func testRenameTable(
func testRenameTables(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTest, oldSchemaIDs, newSchemaIDs []int64, newTableNames []*pmodel.CIStr, oldTableIDs []int64, oldSchemaNames, oldTableNames []*pmodel.CIStr) *model.Job {
job := &model.Job{
Type: model.ActionRenameTables,
Version: model.GetJobVerInUse(),
joccau marked this conversation as resolved.
Show resolved Hide resolved
BinlogInfo: &model.HistoryInfo{},
Args: []any{oldSchemaIDs, newSchemaIDs, newTableNames, oldTableIDs, oldSchemaNames, oldTableNames},
CtxVars: []any{append(oldSchemaIDs, newSchemaIDs...), oldTableIDs},
Expand All @@ -84,6 +94,7 @@ func testRenameTables(t *testing.T, ctx sessionctx.Context, d ddl.ExecutorForTes
{Database: oldSchemaNames[0].L, Table: newTableNames[0].L},
},
}

ctx.SetValue(sessionctx.QueryString, "skip")
require.NoError(t, d.DoDDLJobWrapper(ctx, ddl.NewJobWrapper(job, true)))

Expand Down
7 changes: 4 additions & 3 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,12 @@ func (job *Job) hasDependentSchema(other *Job) (bool, error) {
return true, nil
}
if job.Type == ActionRenameTable {
var oldSchemaID int64
if err := job.DecodeArgs(&oldSchemaID); err != nil {
args, err := GetRenameTableArgs(job)
if err != nil {
return false, errors.Trace(err)
}
if other.SchemaID == oldSchemaID {

if other.SchemaID == args.OldSchemaID {
return true, nil
}
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"

"github.com/pingcap/errors"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
)

// getOrDecodeArgsV2 get the argsV2 from job, if the argsV2 is nil, decode rawArgsV2
Expand Down Expand Up @@ -109,3 +110,98 @@ func GetTruncateTableArgsAfterRun(job *Job) (*TruncateTableArgs, error) {
}
return argsV2, nil
}

// RenameTableArgs is the arguements for rename table DDL job.
type RenameTableArgs struct {
// for Args
OldSchemaID int64 `json:"old_schema_id,omitempty"`
SchemaName pmodel.CIStr `json:"schema_name,omitempty"`
NewTableName pmodel.CIStr `json:"new_table_name,omitempty"`

// for CtxVars
// SchemaIDs contains {oldSchemaID, newSchemaID}
SchemaIDs []int64 `json:"-"`
// TableIDs contains {tableID}
TableIDs []int64 `json:"-"`
joccau marked this conversation as resolved.
Show resolved Hide resolved
}

func (rt *RenameTableArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{rt.OldSchemaID, rt.NewTableName, rt.SchemaName}
job.CtxVars = []any{rt.SchemaIDs, rt.TableIDs}
} else {
job.ArgsV2 = rt
}
}

// NewRenameTableArgs creates a struct with given parameters.
func NewRenameTableArgs(
oldSchemaID int64, newSchemaID int64, schemaName pmodel.CIStr,
tableID int64, newTableName pmodel.CIStr,
) *RenameTableArgs {
return &RenameTableArgs{
OldSchemaID: oldSchemaID,
SchemaName: schemaName,
NewTableName: newTableName,
SchemaIDs: []int64{oldSchemaID, newSchemaID},
TableIDs: []int64{tableID},
}
}

// GetRenameTableArgs get the arguements from job.
func GetRenameTableArgs(job *Job) (*RenameTableArgs, error) {
if job.Version == JobVersion1 {
var (
oldSchemaID int64
schemaName pmodel.CIStr
newTableName pmodel.CIStr
)

// decode args and cache in args.
if len(job.Args) == 0 {
err := job.DecodeArgs(&oldSchemaID, &newTableName, &schemaName)
if err != nil {
return nil, errors.Trace(err)
}
job.Args = []any{oldSchemaID, newTableName, schemaName}
} else {
oldSchemaID = job.Args[0].(int64)
schemaName = job.Args[1].(pmodel.CIStr)
newTableName = job.Args[2].(pmodel.CIStr)
}

args := RenameTableArgs{
OldSchemaID: oldSchemaID,
SchemaName: schemaName,
NewTableName: newTableName,
}
if len(job.CtxVars) > 0 {
args.SchemaIDs = job.CtxVars[0].([]int64)
args.TableIDs = job.CtxVars[1].([]int64)
}
return &args, nil
}

argsV2, err := getOrDecodeArgsV2[*RenameTableArgs](job)
if err != nil {
return nil, errors.Trace(err)
}
return argsV2, err
}

// GetRenameTableUniqueIDs gets unique IDs from job for rename table type.
func GetRenameTableUniqueIDs(job *Job, schema bool) []int64 {
args := &RenameTableArgs{}

if job.Version == JobVersion1 {
args.SchemaIDs = job.CtxVars[0].([]int64)
joccau marked this conversation as resolved.
Show resolved Hide resolved
args.TableIDs = job.CtxVars[1].([]int64)
} else {
args = job.ArgsV2.(*RenameTableArgs)
}

if schema {
return args.SchemaIDs
}
return args.TableIDs
}
8 changes: 6 additions & 2 deletions pkg/meta/model/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,18 @@ func TestJobCodec(t *testing.T) {
// job1: table ID is 2
var err error
job1 := &Job{
Version: JobVersion1,
Version: GetJobVerInUse(),
ID: 2,
TableID: 2,
SchemaID: 1,
Type: ActionRenameTable,
BinlogInfo: &HistoryInfo{},
Args: []any{int64(3), model.NewCIStr("new_table_name")},
}
job1.FillArgs(&RenameTableArgs{
OldSchemaID: 3,
NewTableName: model.NewCIStr("new_table_name"),
})

job1.RawArgs, err = json.Marshal(job1.Args)
require.NoError(t, err)
isDependent, err := job.IsDependentOn(job1)
Expand Down