From 89a55716191e9f3111eb04b6b68efff122bc899a Mon Sep 17 00:00:00 2001 From: Ruihao Chen Date: Sat, 12 Oct 2024 20:41:59 +0800 Subject: [PATCH] ddl: args v2 for add/drop/rename index, include PK/vector-index (#56130) ref pingcap/tidb#53930 --- br/pkg/restore/ingestrec/ingest_recorder.go | 16 +- .../restore/ingestrec/ingest_recorder_test.go | 10 +- pkg/ddl/BUILD.bazel | 1 - pkg/ddl/backfilling_dist_executor.go | 14 +- pkg/ddl/bdr.go | 10 +- pkg/ddl/bdr_test.go | 59 ++- pkg/ddl/delete_range.go | 35 +- pkg/ddl/executor.go | 87 ++++- pkg/ddl/index.go | 180 ++++----- pkg/ddl/index_test.go | 153 -------- pkg/ddl/job_submitter.go | 4 +- pkg/ddl/job_worker.go | 4 +- pkg/ddl/multi_schema_change.go | 56 +-- pkg/ddl/rollingback.go | 46 +-- pkg/ddl/sanity_check.go | 34 +- pkg/ddl/stat_test.go | 4 +- pkg/meta/model/job.go | 18 +- pkg/meta/model/job_args.go | 369 ++++++++++++++++++ pkg/meta/model/job_args_test.go | 140 +++++++ 19 files changed, 810 insertions(+), 430 deletions(-) delete mode 100644 pkg/ddl/index_test.go diff --git a/br/pkg/restore/ingestrec/ingest_recorder.go b/br/pkg/restore/ingestrec/ingest_recorder.go index 1ffa40063bf19..9c0226a746796 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder.go +++ b/br/pkg/restore/ingestrec/ingest_recorder.go @@ -94,15 +94,9 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error { return nil } - allIndexIDs := make([]int64, 1) - // The job.Args is either `Multi-index: ([]int64, ...)`, - // or `Single-index: (int64, ...)`. - // TODO: it's better to use the public function to parse the - // job's Args. - if err := job.DecodeArgs(&allIndexIDs[0]); err != nil { - if err = job.DecodeArgs(&allIndexIDs); err != nil { - return errors.Trace(err) - } + args, err := model.GetFinishedModifyIndexArgs(job) + if err != nil { + return errors.Trace(err) } tableindexes, exists := i.items[job.TableID] @@ -113,8 +107,8 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error { // the current information of table/index might be modified by other ddl jobs, // therefore update the index information at last - for _, indexID := range allIndexIDs { - tableindexes[indexID] = &IngestIndexInfo{ + for _, a := range args.IndexArgs { + tableindexes[a.IndexID] = &IngestIndexInfo{ IsPrimary: job.Type == model.ActionAddPrimaryKey, Updated: false, } diff --git a/br/pkg/restore/ingestrec/ingest_recorder_test.go b/br/pkg/restore/ingestrec/ingest_recorder_test.go index 23225da27d8de..8be133914580a 100644 --- a/br/pkg/restore/ingestrec/ingest_recorder_test.go +++ b/br/pkg/restore/ingestrec/ingest_recorder_test.go @@ -234,7 +234,7 @@ func TestAddIngestRecorder(t *testing.T) { []*model.IndexInfo{ getIndex(1, []string{"x", "y"}), }, - json.RawMessage(`[1, "a"]`), + json.RawMessage(`[1, false, [], false]`), ), false) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) @@ -256,7 +256,7 @@ func TestAddIngestRecorder(t *testing.T) { []*model.IndexInfo{ getIndex(1, []string{"x", "y"}), }, - json.RawMessage(`[1, "a"]`), + json.RawMessage(`[1, false, [], false]`), ), false) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) @@ -277,7 +277,7 @@ func TestAddIngestRecorder(t *testing.T) { []*model.IndexInfo{ getIndex(1, []string{"x", "y"}), }, - json.RawMessage(`[1, "a"]`), + json.RawMessage(`[1, false, [], false]`), ), true) require.NoError(t, err) f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"}) @@ -375,7 +375,7 @@ func TestIndexesKind(t *testing.T) { []*model.IndexInfo{ getIndex(1, []string{"x"}), }, - json.RawMessage(`[1, "a"]`), + json.RawMessage(`[1, false, [], false]`), ), false) require.NoError(t, err) err = recorder.UpdateIndexInfo(infoSchema) @@ -472,7 +472,7 @@ func TestRewriteTableID(t *testing.T) { []*model.IndexInfo{ getIndex(1, []string{"x", "y"}), }, - json.RawMessage(`[1, "a"]`), + json.RawMessage(`[1, false, [], false]`), ), false) require.NoError(t, err) err = recorder.UpdateIndexInfo(infoSchema) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index f952cd7fc4cff..b309a70868696 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -241,7 +241,6 @@ go_test( "index_change_test.go", "index_cop_test.go", "index_modify_test.go", - "index_test.go", "integration_test.go", "job_scheduler_test.go", "job_scheduler_testkit_test.go", diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index e2bbc86e18343..c9867bc228fd5 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -161,19 +161,13 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) { } func hasUniqueIndex(job *model.Job) (bool, error) { - var unique bool - err := job.DecodeArgs(&unique) - if err == nil { - return unique, nil - } - - var uniques []bool - err = job.DecodeArgs(&uniques) + args, err := model.GetModifyIndexArgs(job) if err != nil { return false, errors.Trace(err) } - for _, b := range uniques { - if b { + + for _, a := range args.IndexArgs { + if a.Unique { return true, nil } } diff --git a/pkg/ddl/bdr.go b/pkg/ddl/bdr.go index 7bbf26d62ddcb..2f0ea38a832a8 100644 --- a/pkg/ddl/bdr.go +++ b/pkg/ddl/bdr.go @@ -85,7 +85,7 @@ func deniedByBDRWhenModifyColumn(newFieldType, oldFieldType types.FieldType, opt } // DeniedByBDR checks whether the DDL is denied by BDR. -func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (denied bool) { +func DeniedByBDR(role ast.BDRRole, action model.ActionType, args model.JobArgs) (denied bool) { ddlType, ok := model.ActionBDRMap[action] switch role { case ast.BDRRolePrimary: @@ -94,10 +94,10 @@ func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (den } // Can't add unique index on primary role. - if job != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) && - len(job.Args) >= 1 && job.Args[0].(bool) { - // job.Args[0] is unique when job.Type is ActionAddIndex or ActionAddPrimaryKey. - return true + if args != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) { + if args.(*model.ModifyIndexArgs).IndexArgs[0].Unique { + return true + } } if ddlType == model.SafeDDL || ddlType == model.UnmanagementDDL { diff --git a/pkg/ddl/bdr_test.go b/pkg/ddl/bdr_test.go index 1ec1a69be35cf..bf3997af1dd75 100644 --- a/pkg/ddl/bdr_test.go +++ b/pkg/ddl/bdr_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/assert" @@ -475,42 +476,60 @@ func TestDeniedByBDR(t *testing.T) { } // test special cases + indexArgs := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{ + Global: false, + IndexName: pmodel.NewCIStr("idx1"), + IndexPartSpecifications: []*ast.IndexPartSpecification{{Length: 2}}, + IndexOption: &ast.IndexOption{}, + HiddenCols: nil, + SQLMode: mysql.ModeANSI, + IndexID: 1, + IfExist: false, + IsGlobal: false, + }}, + OpType: model.OpAddIndex, + } + testCases2 := []struct { role ast.BDRRole action model.ActionType - job *model.Job + unique bool expected bool }{ { - role: ast.BDRRolePrimary, - action: model.ActionAddPrimaryKey, - job: &model.Job{ - Type: model.ActionAddPrimaryKey, - Args: []any{true}, - }, + role: ast.BDRRolePrimary, + action: model.ActionAddPrimaryKey, + unique: true, expected: true, }, { - role: ast.BDRRolePrimary, - action: model.ActionAddIndex, - job: &model.Job{ - Type: model.ActionAddIndex, - Args: []any{true}, - }, + role: ast.BDRRolePrimary, + action: model.ActionAddIndex, + unique: true, expected: true, }, { - role: ast.BDRRolePrimary, - action: model.ActionAddIndex, - job: &model.Job{ - Type: model.ActionAddIndex, - Args: []any{false}, - }, + role: ast.BDRRolePrimary, + action: model.ActionAddIndex, + unique: false, expected: false, }, } for _, tc := range testCases2 { - assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, tc.job), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action)) + indexArgs.IndexArgs[0].Unique = tc.unique + indexArgs.IndexArgs[0].IsPK = tc.action == model.ActionAddPrimaryKey + for _, ver := range []model.JobVersion{model.JobVersion1, model.JobVersion2} { + job := &model.Job{ + Version: ver, + Type: tc.action, + } + job.FillArgs(indexArgs) + job.Encode(true) + args, err := model.GetModifyIndexArgs(job) + require.NoError(t, err) + assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, args), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action)) + } } } diff --git a/pkg/ddl/delete_range.go b/pkg/ddl/delete_range.go index 98bb6980953ac..6cd27ff79c877 100644 --- a/pkg/ddl/delete_range.go +++ b/pkg/ddl/delete_range.go @@ -342,30 +342,25 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)")) // ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled. case model.ActionAddIndex, model.ActionAddPrimaryKey: - allIndexIDs := make([]int64, 1) - ifExists := make([]bool, 1) - isGlobal := make([]bool, 0, 1) - var partitionIDs []int64 - if err := job.DecodeArgs(&allIndexIDs[0], &ifExists[0], &partitionIDs); err != nil { - if err = job.DecodeArgs(&allIndexIDs, &ifExists, &partitionIDs, &isGlobal); err != nil { - return errors.Trace(err) - } + args, err := model.GetFinishedModifyIndexArgs(job) + if err != nil { + return errors.Trace(err) } // Determine the physicalIDs to be added. physicalIDs := []int64{job.TableID} - if len(partitionIDs) > 0 { - physicalIDs = partitionIDs + if len(args.PartitionIDs) > 0 { + physicalIDs = args.PartitionIDs } - for i, indexID := range allIndexIDs { + for _, indexArg := range args.IndexArgs { // Determine the index IDs to be added. - tempIdxID := tablecodec.TempIndexPrefix | indexID + tempIdxID := tablecodec.TempIndexPrefix | indexArg.IndexID var indexIDs []int64 if job.State == model.JobStateRollbackDone { - indexIDs = []int64{indexID, tempIdxID} + indexIDs = []int64{indexArg.IndexID, tempIdxID} } else { indexIDs = []int64{tempIdxID} } - if len(isGlobal) != 0 && isGlobal[i] { + if indexArg.IsGlobal { if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "add index: physical table ID(s)"); err != nil { return errors.Trace(err) } @@ -378,14 +373,18 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap } } case model.ActionDropIndex, model.ActionDropPrimaryKey: - tableID := job.TableID - _, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs() + args, err := model.GetFinishedModifyIndexArgs(job) if err != nil { return errors.Trace(err) } + + tableID := job.TableID + partitionIDs := args.PartitionIDs + indexIDs := []int64{args.IndexArgs[0].IndexID} + // partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table. if len(partitionIDs) == 0 { - return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, allIndexIDs, ea, "drop index: table ID")) + return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, indexIDs, ea, "drop index: table ID")) } failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) { if val.(bool) { @@ -393,7 +392,7 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap } }) for _, pid := range partitionIDs { - if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, allIndexIDs, ea, "drop index: partition table ID"); err != nil { + if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop index: partition table ID"); err != nil { return errors.Trace(err) } } diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 3fa5c1c147590..1b0747817d90d 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -4017,19 +4017,27 @@ func (e *executor) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *as return errors.Trace(err) } + // TODO(joechenrh): Switch job version after refactor done. job := &model.Job{ + Version: model.JobVersion1, SchemaID: schema.ID, TableID: tb.Meta().ID, SchemaName: schema.Name.L, TableName: tb.Meta().Name.L, Type: model.ActionRenameIndex, BinlogInfo: &model.HistoryInfo{}, - Args: []any{spec.FromKey, spec.ToKey}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + args := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{ + {IndexName: spec.FromKey}, + {IndexName: spec.ToKey}, + }, + } + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -4574,12 +4582,11 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN } } - unique := true sqlMode := ctx.GetSessionVars().SQLMode // global is set to 'false' is just there to be backwards compatible, // to avoid unmarshal issues, it is now part of indexOption. - global := false job := &model.Job{ + Version: model.JobVersion1, SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, @@ -4587,18 +4594,32 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN Type: model.ActionAddPrimaryKey, BinlogInfo: &model.HistoryInfo{}, ReorgMeta: nil, - Args: []any{unique, indexName, indexPartSpecifications, indexOption, sqlMode, nil, global}, Priority: ctx.GetSessionVars().DDLReorgPriority, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } + + args := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{ + Unique: true, + IndexName: indexName, + IndexPartSpecifications: indexPartSpecifications, + IndexOption: indexOption, + SQLMode: sqlMode, + Global: false, + IsPK: true, + }}, + OpType: model.OpAddIndex, + } + reorgMeta, err := newReorgMetaFromVariables(job, ctx) if err != nil { return err } job.ReorgMeta = reorgMeta - err = e.DoDDLJob(ctx, job) + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } @@ -4709,12 +4730,25 @@ func (e *executor) createVectorIndex(ctx sessionctx.Context, ti ast.Ident, index if err != nil { return errors.Trace(err) } + // vector index is added in 8.4, always use JobVersion2 + job.Version = model.GetJobVerInUse() job.Type = model.ActionAddVectorIndex indexPartSpecifications[0].Expr = nil - job.Args = []any{indexName, indexPartSpecifications[0], indexOption, funcExpr} + // TODO: support CDCWriteSource - err = e.DoDDLJob(ctx, job) + args := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{ + IndexName: indexName, + IndexPartSpecifications: indexPartSpecifications, + IndexOption: indexOption, + FuncExpr: funcExpr, + IsVector: true, + }}, + OpType: model.OpAddIndex, + } + + err = e.doDDLJob2(ctx, job, args) // key exists, but if_not_exists flags is true, so we ignore this error. if dbterror.ErrDupKeyName.Equal(err) && ifNotExists { ctx.GetSessionVars().StmtCtx.AppendNote(err) @@ -4871,11 +4905,25 @@ func (e *executor) createIndex(ctx sessionctx.Context, ti ast.Ident, keyType ast if err != nil { return errors.Trace(err) } + // TODO(joechenrh): Switch job version after refactor done. + job.Version = model.JobVersion1 job.Type = model.ActionAddIndex - job.Args = []any{unique, indexName, indexPartSpecifications, indexOption, hiddenCols, global} job.CDCWriteSource = ctx.GetSessionVars().CDCWriteSource - err = e.DoDDLJob(ctx, job) + args := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{ + Unique: unique, + IndexName: indexName, + IndexPartSpecifications: indexPartSpecifications, + IndexOption: indexOption, + HiddenCols: hiddenCols, + Global: global, + }}, + OpType: model.OpAddIndex, + } + + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) // key exists, but if_not_exists flags is true, so we ignore this error. if dbterror.ErrDupKeyName.Equal(err) && ifNotExists { ctx.GetSessionVars().StmtCtx.AppendNote(err) @@ -5161,7 +5209,7 @@ func (*executor) dropHypoIndexFromCtx(ctx sessionctx.Context, schema, table, ind // dropIndex drops the specified index. // isHypo is used to indicate whether this operation is for a hypo-index. -func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmodel.CIStr, ifExists, isHypo bool) error { +func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmodel.CIStr, ifExist, isHypo bool) error { is := e.infoCache.GetLatest() schema, ok := is.SchemaByName(ti.Schema) if !ok { @@ -5176,7 +5224,7 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmo } if isHypo { - return e.dropHypoIndexFromCtx(ctx, ti.Schema, ti.Name, indexName, ifExists) + return e.dropHypoIndexFromCtx(ctx, ti.Schema, ti.Name, indexName, ifExist) } indexInfo := t.Meta().FindIndexByName(indexName.L) @@ -5192,7 +5240,7 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmo if indexInfo == nil { err = dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", indexName) - if ifExists { + if ifExist { ctx.GetSessionVars().StmtCtx.AppendNote(err) return nil } @@ -5209,7 +5257,9 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmo jobTp = model.ActionDropPrimaryKey } + // TODO(joechenrh): Switch job version after refactor done. job := &model.Job{ + Version: model.JobVersion1, SchemaID: schema.ID, TableID: t.Meta().ID, SchemaName: schema.Name.L, @@ -5217,12 +5267,19 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmo TableName: t.Meta().Name.L, Type: jobTp, BinlogInfo: &model.HistoryInfo{}, - Args: []any{indexName, ifExists}, CDCWriteSource: ctx.GetSessionVars().CDCWriteSource, SQLMode: ctx.GetSessionVars().SQLMode, } - err = e.DoDDLJob(ctx, job) + args := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{ + IndexName: indexName, + IfExist: ifExist, + }}, + OpType: model.OpDropIndex, + } + job.FillArgs(args) + err = e.doDDLJob2(ctx, job, args) return errors.Trace(err) } diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index 53ecfb002352a..82d570b1a8434 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -632,42 +632,15 @@ func moveAndUpdateHiddenColumnsToPublic(tblInfo *model.TableInfo, idxInfo *model } } -func decodeAddIndexArgs(job *model.Job) ( - uniques []bool, - indexNames []pmodel.CIStr, - indexPartSpecifications [][]*ast.IndexPartSpecification, - indexOptions []*ast.IndexOption, - hiddenCols [][]*model.ColumnInfo, - err error, -) { - var ( - unique bool - indexName pmodel.CIStr - indexPartSpecification []*ast.IndexPartSpecification - indexOption *ast.IndexOption - hiddenCol []*model.ColumnInfo - ) - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecification, &indexOption, &hiddenCol) - if err == nil { - return []bool{unique}, - []pmodel.CIStr{indexName}, - [][]*ast.IndexPartSpecification{indexPartSpecification}, - []*ast.IndexOption{indexOption}, - [][]*model.ColumnInfo{hiddenCol}, - nil - } - - err = job.DecodeArgs(&uniques, &indexNames, &indexPartSpecifications, &indexOptions, &hiddenCols) - return -} - -func checkAndBuildIndexInfo(job *model.Job, tblInfo *model.TableInfo, indexName pmodel.CIStr, isPK, unique, isVector bool, - indexPartSpecifications []*ast.IndexPartSpecification, indexOption *ast.IndexOption, hiddenCols []*model.ColumnInfo) (*model.IndexInfo, error) { +func checkAndBuildIndexInfo( + job *model.Job, tblInfo *model.TableInfo, + isVector bool, isPK bool, args *model.IndexArg, +) (*model.IndexInfo, error) { var err error - indexInfo := tblInfo.FindIndexByName(indexName.L) + indexInfo := tblInfo.FindIndexByName(args.IndexName.L) if indexInfo != nil { if indexInfo.State == model.StatePublic { - err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", indexName) + err = dbterror.ErrDupKeyName.GenWithStack("index already exist %s", args.IndexName) if isPK { err = infoschema.ErrMultiplePriKey } @@ -676,7 +649,7 @@ func checkAndBuildIndexInfo(job *model.Job, tblInfo *model.TableInfo, indexName return indexInfo, nil } - for _, hiddenCol := range hiddenCols { + for _, hiddenCol := range args.HiddenCols { columnInfo := model.FindColumnInfo(tblInfo.Columns, hiddenCol.Name.L) if columnInfo != nil && columnInfo.State == model.StatePublic { // We already have a column with the same column name. @@ -685,8 +658,8 @@ func checkAndBuildIndexInfo(job *model.Job, tblInfo *model.TableInfo, indexName } } - if len(hiddenCols) > 0 { - for _, hiddenCol := range hiddenCols { + if len(args.HiddenCols) > 0 { + for _, hiddenCol := range args.HiddenCols { InitAndAddColumnToTable(tblInfo, hiddenCol) } } @@ -696,19 +669,19 @@ func checkAndBuildIndexInfo(job *model.Job, tblInfo *model.TableInfo, indexName indexInfo, err = BuildIndexInfo( nil, tblInfo, - indexName, + args.IndexName, isPK, - unique, + args.Unique, isVector, - indexPartSpecifications, - indexOption, + args.IndexPartSpecifications, + args.IndexOption, model.StateNone, ) if err != nil { return nil, errors.Trace(err) } if isPK { - if _, err = CheckPKOnGeneratedColumn(tblInfo, indexPartSpecifications); err != nil { + if _, err = CheckPKOnGeneratedColumn(tblInfo, args.IndexPartSpecifications); err != nil { return nil, err } } @@ -747,27 +720,22 @@ func (w *worker) onCreateVectorIndex(jobCtx *jobContext, job *model.Job) (ver in return ver, errors.Trace(err) } - var ( - indexName pmodel.CIStr - indexOption *ast.IndexOption - indexPartSpecification *ast.IndexPartSpecification - funcExpr string - ) - err = job.DecodeArgs(&indexName, &indexPartSpecification, &indexOption, &funcExpr) + args, err := model.GetModifyIndexArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - indexPartSpecification.Expr, err = generatedexpr.ParseExpression(funcExpr) + a := args.IndexArgs[0] + a.IndexPartSpecifications[0].Expr, err = generatedexpr.ParseExpression(a.FuncExpr) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } defer func() { - indexPartSpecification.Expr = nil + a.IndexPartSpecifications[0].Expr = nil }() - indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, indexName, false, false, true, []*ast.IndexPartSpecification{indexPartSpecification}, indexOption, nil) + indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, true, false, a) if err != nil { return ver, errors.Trace(err) } @@ -836,7 +804,14 @@ func (w *worker) onCreateVectorIndex(jobCtx *jobContext, job *model.Job) (ver in if err != nil { return ver, errors.Trace(err) } - job.Args = []any{indexInfo.ID, false /*if exists*/, getPartitionIDs(tblInfo)} + + finishedArgs := &model.ModifyIndexArgs{ + IndexArgs: []*model.IndexArg{{IndexID: indexInfo.ID}}, + PartitionIDs: getPartitionIDs(tblInfo), + OpType: model.OpAddIndex, + } + job.FillFinishedArgs(finishedArgs) + // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) logutil.DDLLogger().Info("[ddl] run add vector index job done", @@ -970,28 +945,15 @@ func (w *worker) onCreateIndex(jobCtx *jobContext, job *model.Job, isPK bool) (v return ver, errors.Trace(dbterror.ErrOptOnCacheTable.GenWithStackByArgs("Create Index")) } - uniques := make([]bool, 1) - indexNames := make([]pmodel.CIStr, 1) - indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) - indexOption := make([]*ast.IndexOption, 1) - var sqlMode mysql.SQLMode - var warnings []string - hiddenCols := make([][]*model.ColumnInfo, 1) - - if isPK { - // Notice: sqlMode and warnings is used to support non-strict mode. - err = job.DecodeArgs(&uniques[0], &indexNames[0], &indexPartSpecifications[0], &indexOption[0], &sqlMode, &warnings) - } else { - uniques, indexNames, indexPartSpecifications, indexOption, hiddenCols, err = decodeAddIndexArgs(job) - } + args, err := model.GetModifyIndexArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - allIndexInfos := make([]*model.IndexInfo, 0, len(indexNames)) - for i, indexName := range indexNames { - indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, indexName, isPK, uniques[i], false, indexPartSpecifications[i], indexOption[i], hiddenCols[i]) + allIndexInfos := make([]*model.IndexInfo, 0, len(args.IndexArgs)) + for _, arg := range args.IndexArgs { + indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, false, job.Type == model.ActionAddPrimaryKey, arg) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) @@ -1109,15 +1071,19 @@ SwitchIndexState: return ver, errors.Trace(err) } - allIndexIDs := make([]int64, 0, len(allIndexInfos)) - ifExists := make([]bool, 0, len(allIndexInfos)) - isGlobal := make([]bool, 0, len(allIndexInfos)) + a := &model.ModifyIndexArgs{ + PartitionIDs: getPartitionIDs(tbl.Meta()), + OpType: model.OpAddIndex, + } for _, indexInfo := range allIndexInfos { - allIndexIDs = append(allIndexIDs, indexInfo.ID) - ifExists = append(ifExists, false) - isGlobal = append(isGlobal, indexInfo.Global) + a.IndexArgs = append(a.IndexArgs, &model.IndexArg{ + IndexID: indexInfo.ID, + IfExist: false, + IsGlobal: indexInfo.Global, + }) } - job.Args = []any{allIndexIDs, ifExists, getPartitionIDs(tbl.Meta()), isGlobal} + job.FillFinishedArgs(a) + // Finish this job. job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) if !job.ReorgMeta.IsDistReorg && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge { @@ -1438,14 +1404,14 @@ func onDropIndex(jobCtx *jobContext, job *model.Job) (ver int64, _ error) { } case model.StateDeleteReorganization: // reorganization -> absent - idxIDs := make([]int64, 0, len(allIndexInfos)) + indexIDs := make([]int64, 0, len(allIndexInfos)) for _, indexInfo := range allIndexInfos { indexInfo.State = model.StateNone // Set column index flag. DropIndexColumnFlag(tblInfo, indexInfo) RemoveDependentHiddenColumns(tblInfo, indexInfo) removeIndexInfo(tblInfo, indexInfo) - idxIDs = append(idxIDs, indexInfo.ID) + indexIDs = append(indexIDs, indexInfo.ID) } failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { @@ -1462,20 +1428,43 @@ func onDropIndex(jobCtx *jobContext, job *model.Job) (ver int64, _ error) { // Finish this job. if job.IsRollingback() { + dropArgs, err := model.GetFinishedModifyIndexArgs(job) job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo) - job.Args[0] = idxIDs + if err != nil { + return ver, errors.Trace(err) + } + + // Convert drop index args to finished add index args again to finish add index job. + // Only rolled back add index jobs will get here, since drop index jobs can only be cancelled, not rolled back. + addIndexArgs := &model.ModifyIndexArgs{ + PartitionIDs: dropArgs.PartitionIDs, + OpType: model.OpAddIndex, + } + for i, indexID := range indexIDs { + addIndexArgs.IndexArgs = append(addIndexArgs.IndexArgs, + &model.IndexArg{ + IndexID: indexID, + IfExist: dropArgs.IndexArgs[i].IfExist, + }) + } + job.FillFinishedArgs(addIndexArgs) } else { // the partition ids were append by convertAddIdxJob2RollbackJob, it is weird, but for the compatibility, // we should keep appending the partitions in the convertAddIdxJob2RollbackJob. job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo) - isVector := allIndexInfos[0].VectorInfo != nil // Global index key has t{tableID}_ prefix. // Assign partitionIDs empty to guarantee correct prefix in insertJobIntoDeleteRangeTable. - if allIndexInfos[0].Global { - job.Args = append(job.Args, idxIDs[0], []int64{}, isVector) - } else { - job.Args = append(job.Args, idxIDs[0], getPartitionIDs(tblInfo), isVector) + dropArgs, err := model.GetDropIndexArgs(job) + dropArgs.OpType = model.OpDropIndex + if err != nil { + return ver, errors.Trace(err) + } + dropArgs.IndexArgs[0].IndexID = indexIDs[0] + dropArgs.IndexArgs[0].IsVector = allIndexInfos[0].VectorInfo != nil + if !allIndexInfos[0].Global { + dropArgs.PartitionIDs = getPartitionIDs(tblInfo) } + job.FillFinishedArgs(dropArgs) } default: return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", allIndexInfos[0].State)) @@ -1527,21 +1516,18 @@ func checkDropIndex(infoCache *infoschema.InfoCache, t *meta.Mutator, job *model return nil, nil, false, errors.Trace(err) } - indexNames := make([]pmodel.CIStr, 1) - ifExists := make([]bool, 1) - if err = job.DecodeArgs(&indexNames[0], &ifExists[0]); err != nil { - if err = job.DecodeArgs(&indexNames, &ifExists); err != nil { - job.State = model.JobStateCancelled - return nil, nil, false, errors.Trace(err) - } + args, err := model.GetDropIndexArgs(job) + if err != nil { + job.State = model.JobStateCancelled + return nil, nil, false, errors.Trace(err) } - indexInfos := make([]*model.IndexInfo, 0, len(indexNames)) - for i, idxName := range indexNames { - indexInfo := tblInfo.FindIndexByName(idxName.L) + indexInfos := make([]*model.IndexInfo, 0, len(args.IndexArgs)) + for _, idxArg := range args.IndexArgs { + indexInfo := tblInfo.FindIndexByName(idxArg.IndexName.L) if indexInfo == nil { job.State = model.JobStateCancelled - return nil, nil, ifExists[i], dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", idxName) + return nil, nil, idxArg.IfExist, dbterror.ErrCantDropFieldOrKey.GenWithStack("index %s doesn't exist", idxArg.IndexName) } // Check that drop primary index will not cause invisible implicit primary index. @@ -1591,10 +1577,12 @@ func checkRenameIndex(t *meta.Mutator, job *model.Job) (*model.TableInfo, pmodel return nil, from, to, errors.Trace(err) } - if err := job.DecodeArgs(&from, &to); err != nil { + args, err := model.GetModifyIndexArgs(job) + if err != nil { job.State = model.JobStateCancelled return nil, from, to, errors.Trace(err) } + from, to = args.GetRenameIndexes() // Double check. See function `RenameIndex` in executor.go duplicate, err := ValidateRenameIndex(from, to, tblInfo) diff --git a/pkg/ddl/index_test.go b/pkg/ddl/index_test.go deleted file mode 100644 index eaf0b6850edaa..0000000000000 --- a/pkg/ddl/index_test.go +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ddl - -import ( - "encoding/json" - "testing" - - "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" - pmodel "github.com/pingcap/tidb/pkg/parser/model" - "github.com/stretchr/testify/require" -) - -func TestDecodeAddIndexArgsCompatibility(t *testing.T) { - cases := []struct { - raw json.RawMessage - uniques []bool - indexNames []pmodel.CIStr - indexPartSpecifications [][]*ast.IndexPartSpecification - indexOptions []*ast.IndexOption - hiddenCols [][]*model.ColumnInfo - }{ - { - raw: json.RawMessage(`[ -true, -{"O":"t","L":"t"}, -[ - {"Column":{"Schema":{"O":"","L":""},"Table":{"O":"","L":""},"Name":{"O":"a","L":"a"}},"Length":-1,"Desc":false,"Expr":null}, - {"Column":{"Schema":{"O":"","L":""},"Table":{"O":"","L":""},"Name":{"O":"b","L":"b"}},"Length":-1,"Desc":false,"Expr":null} -], -null, -[], -false]`), - uniques: []bool{true}, - indexNames: []pmodel.CIStr{ - {O: "t", L: "t"}, - }, - indexPartSpecifications: [][]*ast.IndexPartSpecification{ - { - { - Column: &ast.ColumnName{ - Schema: pmodel.CIStr{O: "", L: ""}, - Table: pmodel.CIStr{O: "", L: ""}, - Name: pmodel.CIStr{O: "a", L: "a"}, - }, - Length: -1, - Desc: false, - Expr: nil, - }, - { - Column: &ast.ColumnName{ - Schema: pmodel.CIStr{O: "", L: ""}, - Table: pmodel.CIStr{O: "", L: ""}, - Name: pmodel.CIStr{O: "b", L: "b"}, - }, - Length: -1, - Desc: false, - Expr: nil, - }, - }, - }, - indexOptions: []*ast.IndexOption{nil}, - hiddenCols: [][]*model.ColumnInfo{{}}, - }, - { - raw: json.RawMessage(`[ -[false,true], -[{"O":"t","L":"t"},{"O":"t1","L":"t1"}], -[ - [ - {"Column":{"Schema":{"O":"","L":""},"Table":{"O":"","L":""},"Name":{"O":"a","L":"a"}},"Length":-1,"Desc":false,"Expr":null}, - {"Column":{"Schema":{"O":"","L":""},"Table":{"O":"","L":""},"Name":{"O":"b","L":"b"}},"Length":-1,"Desc":false,"Expr":null} - ], - [ - {"Column":{"Schema":{"O":"","L":""},"Table":{"O":"","L":""},"Name":{"O":"a","L":"a"}},"Length":-1,"Desc":false,"Expr":null} - ] -], -[null,null], -[[],[]], -[false,false]]`), - uniques: []bool{false, true}, - indexNames: []pmodel.CIStr{ - {O: "t", L: "t"}, {O: "t1", L: "t1"}, - }, - indexPartSpecifications: [][]*ast.IndexPartSpecification{ - { - { - Column: &ast.ColumnName{ - Schema: pmodel.CIStr{O: "", L: ""}, - Table: pmodel.CIStr{O: "", L: ""}, - Name: pmodel.CIStr{O: "a", L: "a"}, - }, - Length: -1, - Desc: false, - Expr: nil, - }, - { - Column: &ast.ColumnName{ - Schema: pmodel.CIStr{O: "", L: ""}, - Table: pmodel.CIStr{O: "", L: ""}, - Name: pmodel.CIStr{O: "b", L: "b"}, - }, - Length: -1, - Desc: false, - Expr: nil, - }, - }, - { - { - Column: &ast.ColumnName{ - Schema: pmodel.CIStr{O: "", L: ""}, - Table: pmodel.CIStr{O: "", L: ""}, - Name: pmodel.CIStr{O: "a", L: "a"}, - }, - Length: -1, - Desc: false, - Expr: nil, - }, - }, - }, - indexOptions: []*ast.IndexOption{nil, nil}, - hiddenCols: [][]*model.ColumnInfo{{}, {}}, - }, - } - - for _, c := range cases { - job := &model.Job{ - Version: model.JobVersion1, - Type: model.ActionAddIndex, - RawArgs: c.raw, - } - uniques, indexNames, specs, indexOptions, hiddenCols, err := decodeAddIndexArgs(job) - require.NoError(t, err) - require.Equal(t, c.uniques, uniques) - require.Equal(t, c.indexNames, indexNames) - require.Equal(t, c.indexPartSpecifications, specs) - require.Equal(t, c.indexOptions, indexOptions) - require.Equal(t, c.hiddenCols, hiddenCols) - } -} diff --git a/pkg/ddl/job_submitter.go b/pkg/ddl/job_submitter.go index 772b9c167b86a..dba856e53354a 100644 --- a/pkg/ddl/job_submitter.go +++ b/pkg/ddl/job_submitter.go @@ -327,11 +327,11 @@ func (s *JobSubmitter) addBatchDDLJobs2Table(jobWs []*JobWrapper) error { if job.CDCWriteSource == 0 && bdrRole != string(ast.BDRRoleNone) { if job.Type == model.ActionMultiSchemaChange && job.MultiSchemaInfo != nil { for _, subJob := range job.MultiSchemaInfo.SubJobs { - if DeniedByBDR(ast.BDRRole(bdrRole), subJob.Type, job) { + if DeniedByBDR(ast.BDRRole(bdrRole), subJob.Type, subJob.JobArgs) { return dbterror.ErrBDRRestrictedDDL.FastGenByArgs(bdrRole) } } - } else if DeniedByBDR(ast.BDRRole(bdrRole), job.Type, job) { + } else if DeniedByBDR(ast.BDRRole(bdrRole), job.Type, jobW.JobArgs) { return dbterror.ErrBDRRestrictedDDL.FastGenByArgs(bdrRole) } } diff --git a/pkg/ddl/job_worker.go b/pkg/ddl/job_worker.go index c0f0885159012..dea816eb8c043 100644 --- a/pkg/ddl/job_worker.go +++ b/pkg/ddl/job_worker.go @@ -315,13 +315,13 @@ func JobNeedGC(job *model.Job) bool { model.ActionAlterTablePartitioning: return true case model.ActionDropIndex: - _, _, _, _, hasVectors, err := job.DecodeDropIndexFinishedArgs() + args, err := model.GetFinishedModifyIndexArgs(job) if err != nil { return false } // If it's a vector index, it needn't to store key ranges to gc_delete_range. // We don't support drop vector index in multi-schema, so we only check the first one. - if len(hasVectors) > 0 && hasVectors[0] { + if args.IndexArgs[0].IsVector { return false } return true diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 5b11f8c8a07b7..aef9ca83a78b5 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" + "github.com/pingcap/tidb/pkg/util/intest" ) func onMultiSchemaChange(w *worker, jobCtx *jobContext, job *model.Job) (ver int64, err error) { @@ -186,6 +187,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, jobW *JobWrapper) error { } m.SubJobs = append(m.SubJobs, &model.SubJob{ Type: jobW.Type, + JobArgs: jobW.JobArgs, Args: jobW.Args, RawArgs: jobW.RawArgs, SchemaState: jobW.SchemaState, @@ -214,27 +216,27 @@ func fillMultiSchemaInfo(info *model.MultiSchemaInfo, job *JobWrapper) error { colName := job.JobArgs.(*model.TableColumnArgs).Col.Name info.DropColumns = append(info.DropColumns, colName) case model.ActionDropIndex, model.ActionDropPrimaryKey: - indexName := job.Args[0].(pmodel.CIStr) - info.DropIndexes = append(info.DropIndexes, indexName) + args := job.JobArgs.(*model.ModifyIndexArgs) + info.DropIndexes = append(info.DropIndexes, args.IndexArgs[0].IndexName) case model.ActionAddIndex, model.ActionAddPrimaryKey: - indexName := job.Args[1].(pmodel.CIStr) - indexPartSpecifications := job.Args[2].([]*ast.IndexPartSpecification) - info.AddIndexes = append(info.AddIndexes, indexName) - for _, indexPartSpecification := range indexPartSpecifications { + args := job.JobArgs.(*model.ModifyIndexArgs) + // This job has not been merged, len(args) should be one. + intest.Assert(len(args.IndexArgs) == 1, "len(args.IndexArgs) != 1") + indexArg := args.IndexArgs[0] + info.AddIndexes = append(info.AddIndexes, indexArg.IndexName) + for _, indexPartSpecification := range indexArg.IndexPartSpecifications { info.RelativeColumns = append(info.RelativeColumns, indexPartSpecification.Column.Name) } - if hiddenCols, ok := job.Args[4].([]*model.ColumnInfo); ok { - for _, c := range hiddenCols { - for depColName := range c.Dependences { - info.RelativeColumns = append(info.RelativeColumns, pmodel.NewCIStr(depColName)) - } + for _, c := range indexArg.HiddenCols { + for depColName := range c.Dependences { + info.RelativeColumns = append(info.RelativeColumns, pmodel.NewCIStr(depColName)) } } case model.ActionRenameIndex: - from := job.Args[0].(pmodel.CIStr) - to := job.Args[1].(pmodel.CIStr) - info.AddIndexes = append(info.AddIndexes, to) - info.DropIndexes = append(info.DropIndexes, from) + args := job.JobArgs.(*model.ModifyIndexArgs) + from, to := args.GetRenameIndexes() + info.AddIndexes = append(info.AddIndexes, from) + info.DropIndexes = append(info.DropIndexes, to) case model.ActionModifyColumn: newCol := *job.Args[0].(**model.ColumnInfo) oldColName := job.Args[1].(pmodel.CIStr) @@ -343,30 +345,28 @@ func mergeAddIndex(info *model.MultiSchemaInfo) { } if mergeCnt <= 1 { - // no add index job in this multi-schema change. + // No multiple add index jobs in this multi-schema change. return } - var unique []bool - var indexNames []pmodel.CIStr - var indexPartSpecifications [][]*ast.IndexPartSpecification - var indexOption []*ast.IndexOption - var hiddenCols [][]*model.ColumnInfo - newSubJobs := make([]*model.SubJob, 0, len(info.SubJobs)) + newAddIndexesArgs := &model.ModifyIndexArgs{OpType: model.OpAddIndex} + for _, subJob := range info.SubJobs { if subJob.Type == model.ActionAddIndex { - unique = append(unique, subJob.Args[0].(bool)) - indexNames = append(indexNames, subJob.Args[1].(pmodel.CIStr)) - indexPartSpecifications = append(indexPartSpecifications, subJob.Args[2].([]*ast.IndexPartSpecification)) - indexOption = append(indexOption, subJob.Args[3].(*ast.IndexOption)) - hiddenCols = append(hiddenCols, subJob.Args[4].([]*model.ColumnInfo)) + args := subJob.JobArgs.(*model.ModifyIndexArgs) + newAddIndexesArgs.IndexArgs = append(newAddIndexesArgs.IndexArgs, args.IndexArgs...) } else { newSubJobs = append(newSubJobs, subJob) } } - mergedSubJob.Args = []any{unique, indexNames, indexPartSpecifications, indexOption, hiddenCols} + // Fill args for new subjob + proxyJob := &model.Job{Version: model.JobVersion1} + proxyJob.FillArgs(newAddIndexesArgs) + mergedSubJob.Args = proxyJob.Args + mergedSubJob.JobArgs = newAddIndexesArgs + // place the merged add index job at the end of the sub-jobs. newSubJobs = append(newSubJobs, mergedSubJob) info.SubJobs = newSubJobs diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index b83b09abfd78a..d20999b8f8d46 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -21,8 +21,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl/ingest" "github.com/pingcap/tidb/pkg/meta/model" - "github.com/pingcap/tidb/pkg/parser/ast" - pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -57,9 +55,12 @@ func convertAddIdxJob2RollbackJob( } }) + dropArgs := &model.ModifyIndexArgs{ + PartitionIDs: getPartitionIDs(tblInfo), + OpType: model.OpRollbackAddIndex, + } + originalState := allIndexInfos[0].State - idxNames := make([]pmodel.CIStr, 0, len(allIndexInfos)) - ifExists := make([]bool, 0, len(allIndexInfos)) for _, indexInfo := range allIndexInfos { if indexInfo.Primary { nullCols, err := getNullColInfos(tblInfo, indexInfo) @@ -76,12 +77,15 @@ func convertAddIdxJob2RollbackJob( // The write reorganization state in add index job that likes write only state in drop index job. // So the next state is delete only state. indexInfo.State = model.StateDeleteOnly - idxNames = append(idxNames, indexInfo.Name) - ifExists = append(ifExists, false) + dropArgs.IndexArgs = append(dropArgs.IndexArgs, &model.IndexArg{ + IndexName: indexInfo.Name, + IfExist: false, + }) } - // the second and the third args will be used in onDropIndex. - job.Args = []any{idxNames, ifExists, getPartitionIDs(tblInfo)} + // Convert to ModifyIndexArgs + job.FillFinishedArgs(dropArgs) + job.SchemaState = model.StateDeleteOnly ver, err1 := updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != model.StateDeleteOnly) if err1 != nil { @@ -98,7 +102,7 @@ func convertAddIdxJob2RollbackJob( // convertNotReorgAddIdxJob2RollbackJob converts the add index job that are not started workers to rollingbackJob, // to rollback add index operations. job.SnapshotVer == 0 indicates the workers are not started. -func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, job *model.Job, occuredErr error, isVector bool) (ver int64, err error) { +func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, job *model.Job, occuredErr error) (ver int64, err error) { defer func() { if ingest.LitBackCtxMgr != nil { ingest.LitBackCtxMgr.Unregister(job.ID) @@ -110,29 +114,15 @@ func convertNotReorgAddIdxJob2RollbackJob(jobCtx *jobContext, job *model.Job, oc return ver, errors.Trace(err) } - var funcExpr string - var indexPartSpecification *ast.IndexPartSpecification - unique := make([]bool, 1) - indexName := make([]pmodel.CIStr, 1) - indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) - indexOption := make([]*ast.IndexOption, 1) - - if !isVector { - err = job.DecodeArgs(&unique[0], &indexName[0], &indexPartSpecifications[0], &indexOption[0]) - if err != nil { - err = job.DecodeArgs(&unique, &indexName, &indexPartSpecifications, &indexOption) - } - } else { - err = job.DecodeArgs(&indexName[0], &indexPartSpecification, &indexOption[0], &funcExpr) - } + args, err := model.GetModifyIndexArgs(job) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } var indexesInfo []*model.IndexInfo - for _, idxName := range indexName { - indexInfo := tblInfo.FindIndexByName(idxName.L) + for _, a := range args.IndexArgs { + indexInfo := tblInfo.FindIndexByName(a.IndexName.L) if indexInfo != nil { indexesInfo = append(indexesInfo, indexInfo) } @@ -275,7 +265,7 @@ func rollingbackAddVectorIndex(w *worker, jobCtx *jobContext, job *model.Job) (v ver, err = w.onCreateVectorIndex(jobCtx, job) } else { // add index's reorg workers are not running, remove the indexInfo in tableInfo. - ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, true) + ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob) } return } @@ -288,7 +278,7 @@ func rollingbackAddIndex(w *worker, jobCtx *jobContext, job *model.Job, isPK boo ver, err = w.onCreateIndex(jobCtx, job, isPK) } else { // add index's reorg workers are not running, remove the indexInfo in tableInfo. - ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob, false) + ver, err = convertNotReorgAddIdxJob2RollbackJob(jobCtx, job, dbterror.ErrCancelledDDLJob) } return } diff --git a/pkg/ddl/sanity_check.go b/pkg/ddl/sanity_check.go index 882f1f931b6e3..ba542ffdd0f78 100644 --- a/pkg/ddl/sanity_check.go +++ b/pkg/ddl/sanity_check.go @@ -113,24 +113,20 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { } return len(args.OldPhysicalTblIDs), nil case model.ActionAddIndex, model.ActionAddPrimaryKey: - indexID := make([]int64, 1) - ifExists := make([]bool, 1) - isGlobal := make([]bool, 0, 1) - var partitionIDs []int64 - if err := job.DecodeArgs(&indexID[0], &ifExists[0], &partitionIDs); err != nil { - if err := job.DecodeArgs(&indexID, &ifExists, &partitionIDs, &isGlobal); err != nil { - var unique bool - if err := job.DecodeArgs(&unique); err == nil { - // The first argument is bool means nothing need to be added to delete-range table. - return 0, nil - } - return 0, errors.Trace(err) + args, err := model.GetFinishedModifyIndexArgs(job) + if err != nil { + _, err := model.GetModifyIndexArgs(job) + if err == nil { + // There are nothing need to be added to delete-range table. + return 0, nil } + return 0, errors.Trace(err) } + ret := 0 - for i := 0; i < len(indexID); i++ { - num := mathutil.Max(len(partitionIDs), 1) // Add temporary index to del-range table. - if len(isGlobal) != 0 && isGlobal[i] { + for _, arg := range args.IndexArgs { + num := mathutil.Max(len(args.PartitionIDs), 1) // Add temporary index to del-range table. + if arg.IsGlobal { num = 1 // Global index only has one del-range. } if job.State == model.JobStateRollbackDone { @@ -140,15 +136,15 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) { } return ret, nil case model.ActionDropIndex, model.ActionDropPrimaryKey: - _, _, _, partitionIDs, hasVectors, err := job.DecodeDropIndexFinishedArgs() + args, err := model.GetFinishedModifyIndexArgs(job) if err != nil { return 0, errors.Trace(err) } - // We don't support drop vector index in multi-schema, so we only check the first one. - if len(hasVectors) > 0 && hasVectors[0] { + // If it's a vector index, it needn't to store key ranges to gc_delete_range. + if args.IndexArgs[0].IsVector { return 0, nil } - return mathutil.Max(len(partitionIDs), 1), nil + return mathutil.Max(len(args.PartitionIDs), 1), nil case model.ActionDropColumn: args, err := model.GetTableColumnArgs(job) if err != nil { diff --git a/pkg/ddl/stat_test.go b/pkg/ddl/stat_test.go index 413d4e533ef14..5f3f36f304869 100644 --- a/pkg/ddl/stat_test.go +++ b/pkg/ddl/stat_test.go @@ -44,14 +44,14 @@ func TestGetDDLInfo(t *testing.T) { ID: 2, } job := &model.Job{ - Version: model.JobVersion1, + Version: model.GetJobVerInUse(), ID: 1, SchemaID: dbInfo2.ID, Type: model.ActionCreateSchema, RowCount: 0, } job1 := &model.Job{ - Version: model.JobVersion1, + Version: model.GetJobVerInUse(), ID: 2, SchemaID: dbInfo2.ID, Type: model.ActionAddIndex, diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index f885191f1b4ff..17569df0500ae 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -536,7 +536,8 @@ func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { continue } - sub.RawArgs, err = marshalArgs(job.Version, sub.Args) + // TODO(joechenrh): Use version of parent job after refactor done. + sub.RawArgs, err = marshalArgs(JobVersion1, sub.Args) if err != nil { return nil, errors.Trace(err) } @@ -586,20 +587,6 @@ func (job *Job) DecodeArgs(args ...any) error { return nil } -// DecodeDropIndexFinishedArgs decodes the drop index job's args when it's finished. -func (job *Job) DecodeDropIndexFinishedArgs() ( - indexName any, ifExists []bool, indexIDs []int64, partitionIDs []int64, hasVectors []bool, err error) { - ifExists = make([]bool, 1) - indexIDs = make([]int64, 1) - hasVectors = make([]bool, 1) - if err := job.DecodeArgs(&indexName, &ifExists[0], &indexIDs[0], &partitionIDs, &hasVectors[0]); err != nil { - if err := job.DecodeArgs(&indexName, &ifExists, &indexIDs, &partitionIDs, &hasVectors); err != nil { - return nil, []bool{false}, []int64{-1}, nil, []bool{false}, errors.Trace(err) - } - } - return -} - // String implements fmt.Stringer interface. func (job *Job) String() string { rowCount := job.GetRowCount() @@ -882,6 +869,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { // (when multi-schema change is not applicable) or more SubJobs. type SubJob struct { Type ActionType `json:"type"` + JobArgs JobArgs `json:"-"` Args []any `json:"-"` RawArgs json.RawMessage `json:"raw_args"` SchemaState SchemaState `json:"schema_state"` diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index 040d30bebd61f..fffb9286876f3 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/ast" pmodel "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/util/intest" pdhttp "github.com/tikv/pd/client/http" ) @@ -1268,3 +1269,371 @@ func (a *FlashbackClusterArgs) decodeV1(job *Job) error { func GetFlashbackClusterArgs(job *Job) (*FlashbackClusterArgs, error) { return getOrDecodeArgs[*FlashbackClusterArgs](&FlashbackClusterArgs{}, job) } + +// IndexOp is used to identify arguemnt type, which is only used for v1 index args. +// TODO(joechenrh): remove this type after totally switched to v2 +type IndexOp byte + +// List op types. +const ( + OpAddIndex = iota + OpDropIndex + OpRollbackAddIndex +) + +// IndexArg is the argument for single add/drop/rename index operation. +// Different types of job use different fields. +// Below lists used fields for each type (listed in order of the layout in v1) +// +// Adding NonPK: Unique, IndexName, IndexPartSpecifications, IndexOption, SQLMode, Warning(not stored, always nil), Global +// Adding PK: Unique, IndexName, IndexPartSpecifications, IndexOptions, HiddelCols, Global +// Adding vector index: IndexName, IndexPartSpecifications, IndexOption, FuncExpr +// Drop index: IndexName, IfExist, IndexID +// Rollback add index: IndexName, IfExist, IsVector +// Rename index: IndexName +type IndexArg struct { + // Global is never used, we only use Global in IndexOption. Can be deprecated later. + Global bool `json:"-"` + Unique bool `json:"unique,omitempty"` + IndexName pmodel.CIStr `json:"index_name,omitempty"` + IndexPartSpecifications []*ast.IndexPartSpecification `json:"index_part_specifications"` + IndexOption *ast.IndexOption `json:"index_option,omitempty"` + HiddenCols []*ColumnInfo `json:"hidden_cols,omitempty"` + + // For vector index + FuncExpr string `json:"func_expr,omitempty"` + IsVector bool `json:"is_vector,omitempty"` + + // For PK + IsPK bool `json:"is_pk,omitempty"` + SQLMode mysql.SQLMode `json:"sql_mode,omitempty"` + + // IfExist will be used in onDropIndex. + IndexID int64 `json:"index_id,omitempty"` + IfExist bool `json:"if_exist,omitempty"` + IsGlobal bool `json:"is_global,omitempty"` +} + +// ModifyIndexArgs is the argument for add/drop/rename index jobs, +// which includes PK and vector index. +type ModifyIndexArgs struct { + IndexArgs []*IndexArg `json:"index_args,omitempty"` + + // Belows is used for finished args. + PartitionIDs []int64 `json:"partition_ids,omitempty"` + + // This is only used for getFinishedArgsV1 to distinguish different type of job in v1, + // since they need different arguments layout. + // TODO(joechenrh): remove this flag after totally switched to v2 + OpType IndexOp `json:"-"` +} + +func (a *ModifyIndexArgs) getArgsV1(job *Job) []any { + if job.Type == ActionRenameIndex { + return []any{a.IndexArgs[0].IndexName, a.IndexArgs[1].IndexName} + } + + // Drop index + if job.Type == ActionDropIndex || job.Type == ActionDropPrimaryKey { + if len(a.IndexArgs) == 1 { + return []any{a.IndexArgs[0].IndexName, a.IndexArgs[0].IfExist} + } + indexNames := make([]pmodel.CIStr, len(a.IndexArgs)) + ifExists := make([]bool, len(a.IndexArgs)) + for i, idxArg := range a.IndexArgs { + indexNames[i] = idxArg.IndexName + ifExists[i] = idxArg.IfExist + } + return []any{indexNames, ifExists} + } + + // Add vector index + if job.Type == ActionAddVectorIndex { + arg := a.IndexArgs[0] + return []any{arg.IndexName, arg.IndexPartSpecifications[0], arg.IndexOption, arg.FuncExpr} + } + + // Add primary key + if job.Type == ActionAddPrimaryKey { + arg := a.IndexArgs[0] + + // The sixth argument is set and never used. + // Leave it as nil to make it compatible with history job. + return []any{ + arg.Unique, arg.IndexName, arg.IndexPartSpecifications, + arg.IndexOption, arg.SQLMode, nil, arg.Global, + } + } + + // Add index + n := len(a.IndexArgs) + unique := make([]bool, n) + indexName := make([]pmodel.CIStr, n) + indexPartSpecification := make([][]*ast.IndexPartSpecification, n) + indexOption := make([]*ast.IndexOption, n) + hiddenCols := make([][]*ColumnInfo, n) + global := make([]bool, n) + + for i, arg := range a.IndexArgs { + unique[i] = arg.Unique + indexName[i] = arg.IndexName + indexPartSpecification[i] = arg.IndexPartSpecifications + indexOption[i] = arg.IndexOption + hiddenCols[i] = arg.HiddenCols + global[i] = arg.Global + } + + // This is to make the args compatible with old logic + if n == 1 { + return []any{unique[0], indexName[0], indexPartSpecification[0], indexOption[0], hiddenCols[0], global[0]} + } + + return []any{unique, indexName, indexPartSpecification, indexOption, hiddenCols, global} +} + +func (a *ModifyIndexArgs) decodeV1(job *Job) error { + var err error + switch job.Type { + case ActionRenameIndex: + err = a.decodeRenameIndexV1(job) + case ActionAddIndex: + err = a.decodeAddIndexV1(job) + case ActionAddVectorIndex: + err = a.decodeAddVectorIndexV1(job) + case ActionAddPrimaryKey: + err = a.decodeAddPrimaryKeyV1(job) + default: + err = errors.Errorf("Invalid job type for decoding %d", job.Type) + } + return errors.Trace(err) +} + +func (a *ModifyIndexArgs) decodeRenameIndexV1(job *Job) error { + var from, to pmodel.CIStr + if err := job.DecodeArgs(&from, &to); err != nil { + return errors.Trace(err) + } + a.IndexArgs = []*IndexArg{ + {IndexName: from}, + {IndexName: to}, + } + return nil +} + +func (a *ModifyIndexArgs) decodeDropIndexV1(job *Job) error { + indexNames := make([]pmodel.CIStr, 1) + ifExists := make([]bool, 1) + if err := job.DecodeArgs(&indexNames[0], &ifExists[0]); err != nil { + if err = job.DecodeArgs(&indexNames, &ifExists); err != nil { + return errors.Trace(err) + } + } + + a.IndexArgs = make([]*IndexArg, len(indexNames)) + for i, indexName := range indexNames { + a.IndexArgs[i] = &IndexArg{ + IndexName: indexName, + IfExist: ifExists[i], + } + } + return nil +} + +func (a *ModifyIndexArgs) decodeAddIndexV1(job *Job) error { + uniques := make([]bool, 1) + indexNames := make([]pmodel.CIStr, 1) + indexPartSpecifications := make([][]*ast.IndexPartSpecification, 1) + indexOptions := make([]*ast.IndexOption, 1) + hiddenCols := make([][]*ColumnInfo, 1) + globals := make([]bool, 1) + + if err := job.DecodeArgs( + &uniques, &indexNames, &indexPartSpecifications, + &indexOptions, &hiddenCols, &globals); err != nil { + if err = job.DecodeArgs( + &uniques[0], &indexNames[0], &indexPartSpecifications[0], + &indexOptions[0], &hiddenCols[0], &globals[0]); err != nil { + return errors.Trace(err) + } + } + + for i, unique := range uniques { + a.IndexArgs = append(a.IndexArgs, &IndexArg{ + Unique: unique, + IndexName: indexNames[i], + IndexPartSpecifications: indexPartSpecifications[i], + IndexOption: indexOptions[i], + HiddenCols: hiddenCols[i], + Global: globals[i], + }) + } + return nil +} + +func (a *ModifyIndexArgs) decodeAddPrimaryKeyV1(job *Job) error { + a.IndexArgs = []*IndexArg{{IsPK: true}} + var unused any + if err := job.DecodeArgs( + &a.IndexArgs[0].Unique, &a.IndexArgs[0].IndexName, &a.IndexArgs[0].IndexPartSpecifications, + &a.IndexArgs[0].IndexOption, &a.IndexArgs[0].SQLMode, + &unused, &a.IndexArgs[0].Global); err != nil { + return errors.Trace(err) + } + return nil +} + +func (a *ModifyIndexArgs) decodeAddVectorIndexV1(job *Job) error { + var ( + indexName pmodel.CIStr + indexPartSpecification *ast.IndexPartSpecification + indexOption *ast.IndexOption + funcExpr string + ) + + if err := job.DecodeArgs( + &indexName, &indexPartSpecification, &indexOption, &funcExpr); err != nil { + return errors.Trace(err) + } + + a.IndexArgs = []*IndexArg{{ + IndexName: indexName, + IndexPartSpecifications: []*ast.IndexPartSpecification{indexPartSpecification}, + IndexOption: indexOption, + FuncExpr: funcExpr, + IsVector: true, + }} + return nil +} + +func (a *ModifyIndexArgs) getFinishedArgsV1(job *Job) []any { + // Add index + if a.OpType == OpAddIndex { + if job.Type == ActionAddVectorIndex { + return []any{a.IndexArgs[0].IndexID, a.IndexArgs[0].IfExist, a.PartitionIDs, a.IndexArgs[0].IsGlobal} + } + + n := len(a.IndexArgs) + indexIDs := make([]int64, n) + ifExists := make([]bool, n) + isGlobals := make([]bool, n) + for i, arg := range a.IndexArgs { + indexIDs[i] = arg.IndexID + ifExists[i] = arg.IfExist + isGlobals[i] = arg.Global + } + return []any{indexIDs, ifExists, a.PartitionIDs, isGlobals} + } + + // Below is to make the args compatible with old logic: + // 1. For drop index, arguments are [CIStr, bool, int64, []int64, bool]. + // 3. For rollback add index, arguments are [[]CIStr, []bool, []int64]. + if a.OpType == OpRollbackAddIndex { + indexNames := make([]pmodel.CIStr, len(a.IndexArgs)) + ifExists := make([]bool, len(a.IndexArgs)) + for i, idxArg := range a.IndexArgs { + indexNames[i] = idxArg.IndexName + ifExists[i] = idxArg.IfExist + } + return []any{indexNames, ifExists, a.PartitionIDs} + } + + idxArg := a.IndexArgs[0] + return []any{idxArg.IndexName, idxArg.IfExist, idxArg.IndexID, a.PartitionIDs, idxArg.IsVector} +} + +// GetRenameIndexes get name of renamed index. +func (a *ModifyIndexArgs) GetRenameIndexes() (from, to pmodel.CIStr) { + from, to = a.IndexArgs[0].IndexName, a.IndexArgs[1].IndexName + return +} + +// GetModifyIndexArgs gets the add/rename index args. +func GetModifyIndexArgs(job *Job) (*ModifyIndexArgs, error) { + return getOrDecodeArgs(&ModifyIndexArgs{}, job) +} + +// GetDropIndexArgs is only used to get drop index arg. +// The logic is separated from ModifyIndexArgs.decodeV1. +// TODO(joechenrh): replace this function with GetModifyIndexArgs after totally switched to v2. +func GetDropIndexArgs(job *Job) (*ModifyIndexArgs, error) { + if job.Version == JobVersion2 { + return getOrDecodeArgsV2[*ModifyIndexArgs](job) + } + + // For add index jobs(ActionAddIndex, etc.) in v1, it can store both drop index arguments and add index arguments. + // The logic in ModifyIndexArgs.decodeV1 maybe: + // Decode rename index args if type == ActionRenameIndex + // Try decode drop index args + // Try decode add index args if failed + // So we separate this from decodeV1 to avoid unnecessary "try decode" logic. + a := &ModifyIndexArgs{} + err := a.decodeDropIndexV1(job) + return a, errors.Trace(err) +} + +// GetFinishedModifyIndexArgs gets the add/drop index args. +func GetFinishedModifyIndexArgs(job *Job) (*ModifyIndexArgs, error) { + if job.Version == JobVersion2 { + return getOrDecodeArgsV2[*ModifyIndexArgs](job) + } + + if job.IsRollingback() || job.Type == ActionDropIndex || job.Type == ActionDropPrimaryKey { + indexNames := make([]pmodel.CIStr, 1) + ifExists := make([]bool, 1) + indexIDs := make([]int64, 1) + var partitionIDs []int64 + isVector := false + var err error + + if job.IsRollingback() { + // Rollback add indexes + err = job.DecodeArgs(&indexNames, &ifExists, &partitionIDs, &isVector) + } else { + // Finish drop index + err = job.DecodeArgs(&indexNames[0], &ifExists[0], &indexIDs[0], &partitionIDs, &isVector) + } + if err != nil { + return nil, errors.Trace(err) + } + + a := &ModifyIndexArgs{ + PartitionIDs: partitionIDs, + } + a.IndexArgs = make([]*IndexArg, len(indexNames)) + for i, indexName := range indexNames { + a.IndexArgs[i] = &IndexArg{ + IndexName: indexName, + IfExist: ifExists[i], + IsVector: isVector, + } + } + // For drop index, store index id in IndexArgs, no impact on other situations. + // Currently, there is only one indexID since drop index is not supported in multi schema change. + // TODO(joechenrh): modify this and corresponding logic if we need support drop multi indexes in V1. + a.IndexArgs[0].IndexID = indexIDs[0] + + return a, nil + } + + // Add index/vector index/PK + addIndexIDs := make([]int64, 1) + ifExists := make([]bool, 1) + isGlobals := make([]bool, 1) + var partitionIDs []int64 + + // add vector index args doesn't store slice. + if err := job.DecodeArgs(&addIndexIDs[0], &ifExists[0], &partitionIDs, &isGlobals[0]); err != nil { + if err = job.DecodeArgs(&addIndexIDs, &ifExists, &partitionIDs, &isGlobals); err != nil { + return nil, errors.Errorf("Failed to decode finished arguments from job version 1") + } + } + a := &ModifyIndexArgs{PartitionIDs: partitionIDs} + for i, indexID := range addIndexIDs { + a.IndexArgs = append(a.IndexArgs, &IndexArg{ + IndexID: indexID, + IfExist: ifExists[i], + IsGlobal: isGlobals[i], + }) + } + return a, nil +} diff --git a/pkg/meta/model/job_args_test.go b/pkg/meta/model/job_args_test.go index f857235f59f6e..e666c1817b7c6 100644 --- a/pkg/meta/model/job_args_test.go +++ b/pkg/meta/model/job_args_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/stretchr/testify/require" pdhttp "github.com/tikv/pd/client/http" ) @@ -962,3 +963,142 @@ func TestAlterTableAttributesArgs(t *testing.T) { require.Equal(t, *inArgs.LabelRule, *args.LabelRule) } } + +func TestAddIndexArgs(t *testing.T) { + inArgs := &ModifyIndexArgs{ + IndexArgs: []*IndexArg{{ + Global: false, + Unique: true, + IndexName: model.NewCIStr("idx1"), + IndexPartSpecifications: []*ast.IndexPartSpecification{{Length: 2}}, + IndexOption: &ast.IndexOption{}, + HiddenCols: []*ColumnInfo{{}, {}}, + SQLMode: mysql.ModeANSI, + IndexID: 1, + IfExist: false, + IsGlobal: false, + FuncExpr: "test_string", + }}, + PartitionIDs: []int64{100, 101, 102}, + OpType: OpAddIndex, + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + inArgs.IndexArgs[0].IsVector = false + inArgs.IndexArgs[0].IsPK = false + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddIndex))) + + args, err := GetModifyIndexArgs(j2) + require.NoError(t, err) + + a := args.IndexArgs[0] + require.Equal(t, inArgs.IndexArgs[0].Global, a.Global) + require.Equal(t, inArgs.IndexArgs[0].Unique, a.Unique) + require.Equal(t, inArgs.IndexArgs[0].IndexName, a.IndexName) + require.Equal(t, inArgs.IndexArgs[0].IndexPartSpecifications, a.IndexPartSpecifications) + require.Equal(t, inArgs.IndexArgs[0].IndexOption, a.IndexOption) + require.Equal(t, inArgs.IndexArgs[0].HiddenCols, a.HiddenCols) + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + inArgs.IndexArgs[0].IsVector = false + inArgs.IndexArgs[0].IsPK = true + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddPrimaryKey))) + + args, err := GetModifyIndexArgs(j2) + require.NoError(t, err) + + a := args.IndexArgs[0] + require.Equal(t, inArgs.IndexArgs[0].Global, a.Global) + require.Equal(t, inArgs.IndexArgs[0].Unique, a.Unique) + require.Equal(t, inArgs.IndexArgs[0].IndexName, a.IndexName) + require.Equal(t, inArgs.IndexArgs[0].IndexPartSpecifications, a.IndexPartSpecifications) + require.Equal(t, inArgs.IndexArgs[0].SQLMode, a.SQLMode) + require.Equal(t, inArgs.IndexArgs[0].IndexOption, a.IndexOption) + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + inArgs.IndexArgs[0].IsVector = true + inArgs.IndexArgs[0].IsPK = false + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAddVectorIndex))) + + args, err := GetModifyIndexArgs(j2) + require.NoError(t, err) + + a := args.IndexArgs[0] + require.Equal(t, inArgs.IndexArgs[0].IndexName, a.IndexName) + require.Equal(t, inArgs.IndexArgs[0].IndexPartSpecifications, a.IndexPartSpecifications) + require.Equal(t, inArgs.IndexArgs[0].IndexOption, a.IndexOption) + require.Equal(t, inArgs.IndexArgs[0].FuncExpr, a.FuncExpr) + } + + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getFinishedJobBytes(t, inArgs, v, ActionAddIndex))) + + args, err := GetFinishedModifyIndexArgs(j2) + require.NoError(t, err) + + a := args.IndexArgs[0] + require.Equal(t, inArgs.IndexArgs[0].IndexID, a.IndexID) + require.Equal(t, inArgs.IndexArgs[0].IfExist, a.IfExist) + require.Equal(t, inArgs.IndexArgs[0].IsGlobal, a.IsGlobal) + require.Equal(t, inArgs.PartitionIDs, args.PartitionIDs) + } +} + +func TestDropIndexArguements(t *testing.T) { + checkFunc := func(t *testing.T, inArgs *ModifyIndexArgs) { + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionDropIndex))) + args, err := GetDropIndexArgs(j2) + require.NoError(t, err) + for i, expect := range inArgs.IndexArgs { + require.EqualValues(t, expect.IndexName, args.IndexArgs[i].IndexName) + require.EqualValues(t, expect.IfExist, args.IndexArgs[i].IfExist) + } + + j2 = &Job{} + require.NoError(t, j2.Decode(getFinishedJobBytes(t, inArgs, v, ActionDropIndex))) + args2, err := GetFinishedModifyIndexArgs(j2) + require.NoError(t, err) + require.EqualValues(t, inArgs.IndexArgs, args2.IndexArgs) + require.EqualValues(t, inArgs.PartitionIDs, args2.PartitionIDs) + } + } + + inArgs := &ModifyIndexArgs{ + IndexArgs: []*IndexArg{ + { + IndexName: model.NewCIStr("i2"), + IfExist: true, + IsVector: true, + IndexID: 1, + }, + }, + PartitionIDs: []int64{100, 101, 102, 103}, + OpType: OpDropIndex, + } + checkFunc(t, inArgs) +} + +func TestGetRenameIndexArgs(t *testing.T) { + inArgs := &ModifyIndexArgs{ + IndexArgs: []*IndexArg{ + {IndexName: model.NewCIStr("old")}, + {IndexName: model.NewCIStr("new")}, + }, + } + for _, v := range []JobVersion{JobVersion1, JobVersion2} { + j2 := &Job{} + require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionRenameIndex))) + + args, err := GetModifyIndexArgs(j2) + require.NoError(t, err) + require.Equal(t, inArgs, args) + } +}