Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refactor V2 job args of dropping column DDL. #56021

Merged
merged 19 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,12 @@ func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
if err != nil {
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
dropColumnArgs.IndexIDs = indexInfosToIDList(idxInfos)
job.FillArgs(dropColumnArgs)
case model.StateDeleteOnly:
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
Expand All @@ -215,7 +220,12 @@ func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
} else {
// We should set related index IDs for job
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
dropColumnArgs.PartitionIDs = getPartitionIDs(tblInfo)
job.FillArgs(dropColumnArgs)
}
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State))
Expand All @@ -231,16 +241,13 @@ func checkDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (*model.T
return nil, nil, nil, false, errors.Trace(err)
}

var colName pmodel.CIStr
var ifExists bool
// indexIDs is used to make sure we don't truncate args when decoding the rawArgs.
var indexIDs []int64
err = job.DecodeArgs(&colName, &ifExists, &indexIDs)
dropColumnArgs, err := model.GetDropColumnArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return nil, nil, nil, false, errors.Trace(err)
}

colName, ifExists := dropColumnArgs.ColName, dropColumnArgs.IfExists
colInfo := model.FindColumnInfo(tblInfo.Columns, colName.L)
if colInfo == nil || colInfo.Hidden {
job.State = model.JobStateCancelled
Expand Down
14 changes: 7 additions & 7 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,13 @@ func TestBuildJobDependence(t *testing.T) {
}()
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
// Add some non-add-index jobs.
job1 := &model.Job{ID: 1, TableID: 1, Type: model.ActionAddColumn}
job2 := &model.Job{ID: 2, TableID: 1, Type: model.ActionCreateTable}
job3 := &model.Job{ID: 3, TableID: 2, Type: model.ActionDropColumn}
job6 := &model.Job{ID: 6, TableID: 1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Type: model.ActionDropSchema}
job11 := &model.Job{Version: model.JobVersion1, ID: 11, TableID: 2, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
job1 := &model.Job{ID: 1, TableID: 1, Version: model.JobVersion1, Type: model.ActionAddColumn}
job2 := &model.Job{ID: 2, TableID: 1, Version: model.JobVersion1, Type: model.ActionCreateTable}
job3 := &model.Job{ID: 3, TableID: 2, Version: model.JobVersion1, Type: model.ActionDropColumn}
job6 := &model.Job{ID: 6, TableID: 1, Version: model.JobVersion1, Type: model.ActionDropTable}
job7 := &model.Job{ID: 7, TableID: 2, Version: model.JobVersion1, Type: model.ActionModifyColumn}
job9 := &model.Job{ID: 9, SchemaID: 111, Version: model.JobVersion1, Type: model.ActionDropSchema}
job11 := &model.Job{ID: 11, TableID: 2, Version: model.JobVersion1, Type: model.ActionRenameTable, Args: []any{int64(111), "old db name"}}
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
require.NoError(t, m.EnQueueDDLJob(job1))
Expand Down
19 changes: 8 additions & 11 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/tablecodec"
Expand Down Expand Up @@ -400,19 +399,17 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
}
case model.ActionDropColumn:
var colName pmodel.CIStr
var ifExists bool
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil {
args, err := model.GetDropColumnArgs(job)
if err != nil {
return errors.Trace(err)
}
if len(indexIDs) > 0 {
if len(partitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "drop column: table ID"))

if len(args.IndexIDs) > 0 {
if len(args.PartitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, args.IndexIDs, ea, "drop column: table ID"))
}
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop column: partition table ID"); err != nil {
for _, pid := range args.PartitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, args.IndexIDs, ea, "drop column: partition table ID"); err != nil {
return errors.Trace(err)
}
}
Expand Down
15 changes: 11 additions & 4 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3159,19 +3159,26 @@ func (e *executor) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.Al
}

job := &model.Job{
// to do(joccau)
// we should set Version = model.GetJobVerInUse() after refactor the actionMultiSchemaChange.
Version: model.JobVersion1,
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: t.Meta().Name.L,
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
Args: []any{colName, spec.IfExists},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.DropColumnArgs{
ColName: colName,
IfExists: spec.IfExists,
}
// we need fill args here, because it will be added subjob which contains args and rawArgs from job.
job.FillArgs(args)
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -6280,7 +6287,7 @@ func (e *executor) DoDDLJobWrapper(ctx sessionctx.Context, jobW *JobWrapper) (re
if mci := ctx.GetSessionVars().StmtCtx.MultiSchemaInfo; mci != nil {
// In multiple schema change, we don't run the job.
// Instead, we merge all the jobs into one pending job.
return appendToSubJobs(mci, job)
return appendToSubJobs(mci, jobW)
}
// Get a global job ID and put the DDL job in the queue.
setDDLJobQuery(ctx, job)
Expand Down
24 changes: 12 additions & 12 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,30 +174,30 @@ func handleRollbackException(runJobErr error, proxyJobErr *terror.Error) error {
return nil
}

func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
err := fillMultiSchemaInfo(m, job)
func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error {
err := fillMultiSchemaInfo(m, jobW)
if err != nil {
return err
}
var reorgTp model.ReorgType
if job.ReorgMeta != nil {
reorgTp = job.ReorgMeta.ReorgTp
if jobW.ReorgMeta != nil {
reorgTp = jobW.ReorgMeta.ReorgTp
}
m.SubJobs = append(m.SubJobs, &model.SubJob{
Type: job.Type,
Args: job.Args,
RawArgs: job.RawArgs,
SchemaState: job.SchemaState,
SnapshotVer: job.SnapshotVer,
Type: jobW.Type,
Args: jobW.Args,
RawArgs: jobW.RawArgs,
SchemaState: jobW.SchemaState,
SnapshotVer: jobW.SnapshotVer,
Revertible: true,
CtxVars: job.CtxVars,
CtxVars: jobW.CtxVars,
ReorgTp: reorgTp,
UseCloud: false,
})
return nil
}

func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error) {
func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error {
switch job.Type {
case model.ActionAddColumn:
col := job.Args[0].(*table.Column)
Expand All @@ -210,7 +210,7 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *model.Job) (err error
info.PositionColumns = append(info.PositionColumns, pos.RelativeColumn.Name)
}
case model.ActionDropColumn:
colName := job.Args[0].(pmodel.CIStr)
colName := job.JobArgs.(*model.DropColumnArgs).ColName
info.DropColumns = append(info.DropColumns, colName)
case model.ActionDropIndex, model.ActionDropPrimaryKey:
indexName := job.Args[0].(pmodel.CIStr)
Expand Down
13 changes: 5 additions & 8 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/mathutil"
Expand Down Expand Up @@ -152,15 +151,13 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
}
return mathutil.Max(len(partitionIDs), 1), nil
case model.ActionDropColumn:
var colName pmodel.CIStr
var ifExists bool
var indexIDs []int64
var partitionIDs []int64
if err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs); err != nil {
args, err := model.GetDropColumnArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
physicalCnt := mathutil.Max(len(partitionIDs), 1)
return physicalCnt * len(indexIDs), nil

physicalCnt := mathutil.Max(len(args.PartitionIDs), 1)
return physicalCnt * len(args.IndexIDs), nil
case model.ActionModifyColumn:
var indexIDs []int64
var partitionIDs []int64
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ go_test(
],
embed = [":model"],
flaky = True,
shard_count = 34,
shard_count = 35,
deps = [
"//pkg/parser/charset",
"//pkg/parser/model",
Expand Down
58 changes: 32 additions & 26 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,39 +490,45 @@ func (job *Job) FillFinishedArgs(args FinishedJobArgs) {
args.fillFinishedJob(job)
}

func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) {
if jobVer <= JobVersion1 {
rawArgs, err := json.Marshal(args)
return rawArgs, errors.Trace(err)
}

intest.Assert(jobVer == JobVersion2, "job version is not v2")
var arg any
if len(args) > 0 {
intest.Assert(len(args) == 1, "Job.Args should have only one element")
arg = args[0]
}

rawArgs, err := json.Marshal(arg)
return rawArgs, errors.Trace(err)
}

// Encode encodes job with json format.
// updateRawArgs is used to determine whether to update the raw args.
func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
var err error
if updateRawArgs {
if job.Version == JobVersion1 {
job.RawArgs, err = json.Marshal(job.Args)
if err != nil {
return nil, errors.Trace(err)
}
if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if sub.Args == nil {
continue
}
sub.RawArgs, err = json.Marshal(sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
job.RawArgs, err = marshalArgs(job.Version, job.Args)
if err != nil {
return nil, errors.Trace(err)
}

if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if sub.Args == nil {
continue
}

sub.RawArgs, err = marshalArgs(job.Version, sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
}
} else {
var arg any
if len(job.Args) > 0 {
intest.Assert(len(job.Args) == 1, "Job.Args should have only one element")
arg = job.Args[0]
}
job.RawArgs, err = json.Marshal(arg)
if err != nil {
return nil, errors.Trace(err)
}
// TODO remember update sub-jobs' RawArgs when we do it.
}
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,49 @@ func GetResourceGroupArgs(job *Job) (*ResourceGroupArgs, error) {
return getOrDecodeArgsV2[*ResourceGroupArgs](job)
}

// DropColumnArgs is the arguments of dropping column job.
type DropColumnArgs struct {
ColName pmodel.CIStr `json:"column_name,omitempty"`
IfExists bool `json:"if_exists,omitempty"`
// below 2 fields are filled during running.
IndexIDs []int64 `json:"index_ids,omitempty"`
PartitionIDs []int64 `json:"partition_ids,omitempty"`
}

func (a *DropColumnArgs) fillJob(job *Job) {
if job.Version <= JobVersion1 {
job.Args = []any{a.ColName, a.IfExists, a.IndexIDs, a.PartitionIDs}
} else {
job.Args = []any{a}
}
}

// GetDropColumnArgs gets the args for drop column ddl.
func GetDropColumnArgs(job *Job) (*DropColumnArgs, error) {
var (
colName pmodel.CIStr
ifExists bool
indexIDs []int64
partitionIDs []int64
)

if job.Version <= JobVersion1 {
err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs)
if err != nil {
return nil, errors.Trace(err)
}

return &DropColumnArgs{
ColName: colName,
IfExists: ifExists,
IndexIDs: indexIDs,
PartitionIDs: partitionIDs,
}, nil
}

return getOrDecodeArgsV2[*DropColumnArgs](job)
}

// RenameTablesArgs is the arguments for rename tables job.
type RenameTablesArgs struct {
RenameTableInfos []*RenameTableArgs `json:"rename_table_infos,omitempty"`
Expand Down
17 changes: 17 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,20 @@ func TestResourceGroupArgs(t *testing.T) {
}
}
}

func TestDropColumnArgs(t *testing.T) {
inArgs := &DropColumnArgs{
ColName: model.NewCIStr("col_name"),
IfExists: true,
IndexIDs: []int64{1, 2, 3},
PartitionIDs: []int64{4, 5, 6},
}

for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropColumn)))
args, err := GetDropColumnArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}