Skip to content

Commit

Permalink
ddl: args v2 for add/drop/rename index, include PK/vector-index (#56130)
Browse files Browse the repository at this point in the history
ref #53930
  • Loading branch information
joechenrh authored Oct 12, 2024
1 parent f1034dc commit 89a5571
Show file tree
Hide file tree
Showing 19 changed files with 810 additions and 430 deletions.
16 changes: 5 additions & 11 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,9 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {
return nil
}

allIndexIDs := make([]int64, 1)
// The job.Args is either `Multi-index: ([]int64, ...)`,
// or `Single-index: (int64, ...)`.
// TODO: it's better to use the public function to parse the
// job's Args.
if err := job.DecodeArgs(&allIndexIDs[0]); err != nil {
if err = job.DecodeArgs(&allIndexIDs); err != nil {
return errors.Trace(err)
}
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
return errors.Trace(err)
}

tableindexes, exists := i.items[job.TableID]
Expand All @@ -113,8 +107,8 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {

// the current information of table/index might be modified by other ddl jobs,
// therefore update the index information at last
for _, indexID := range allIndexIDs {
tableindexes[indexID] = &IngestIndexInfo{
for _, a := range args.IndexArgs {
tableindexes[a.IndexID] = &IngestIndexInfo{
IsPrimary: job.Type == model.ActionAddPrimaryKey,
Updated: false,
}
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand All @@ -256,7 +256,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand All @@ -277,7 +277,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), true)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestIndexesKind(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
err = recorder.UpdateIndexInfo(infoSchema)
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestRewriteTableID(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
err = recorder.UpdateIndexInfo(infoSchema)
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ go_test(
"index_change_test.go",
"index_cop_test.go",
"index_modify_test.go",
"index_test.go",
"integration_test.go",
"job_scheduler_test.go",
"job_scheduler_testkit_test.go",
Expand Down
14 changes: 4 additions & 10 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,13 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
}

func hasUniqueIndex(job *model.Job) (bool, error) {
var unique bool
err := job.DecodeArgs(&unique)
if err == nil {
return unique, nil
}

var uniques []bool
err = job.DecodeArgs(&uniques)
args, err := model.GetModifyIndexArgs(job)
if err != nil {
return false, errors.Trace(err)
}
for _, b := range uniques {
if b {

for _, a := range args.IndexArgs {
if a.Unique {
return true, nil
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/bdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func deniedByBDRWhenModifyColumn(newFieldType, oldFieldType types.FieldType, opt
}

// DeniedByBDR checks whether the DDL is denied by BDR.
func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (denied bool) {
func DeniedByBDR(role ast.BDRRole, action model.ActionType, args model.JobArgs) (denied bool) {
ddlType, ok := model.ActionBDRMap[action]
switch role {
case ast.BDRRolePrimary:
Expand All @@ -94,10 +94,10 @@ func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (den
}

// Can't add unique index on primary role.
if job != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) &&
len(job.Args) >= 1 && job.Args[0].(bool) {
// job.Args[0] is unique when job.Type is ActionAddIndex or ActionAddPrimaryKey.
return true
if args != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) {
if args.(*model.ModifyIndexArgs).IndexArgs[0].Unique {
return true
}
}

if ddlType == model.SafeDDL || ddlType == model.UnmanagementDDL {
Expand Down
59 changes: 39 additions & 20 deletions pkg/ddl/bdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -475,42 +476,60 @@ func TestDeniedByBDR(t *testing.T) {
}

// test special cases
indexArgs := &model.ModifyIndexArgs{
IndexArgs: []*model.IndexArg{{
Global: false,
IndexName: pmodel.NewCIStr("idx1"),
IndexPartSpecifications: []*ast.IndexPartSpecification{{Length: 2}},
IndexOption: &ast.IndexOption{},
HiddenCols: nil,
SQLMode: mysql.ModeANSI,
IndexID: 1,
IfExist: false,
IsGlobal: false,
}},
OpType: model.OpAddIndex,
}

testCases2 := []struct {
role ast.BDRRole
action model.ActionType
job *model.Job
unique bool
expected bool
}{
{
role: ast.BDRRolePrimary,
action: model.ActionAddPrimaryKey,
job: &model.Job{
Type: model.ActionAddPrimaryKey,
Args: []any{true},
},
role: ast.BDRRolePrimary,
action: model.ActionAddPrimaryKey,
unique: true,
expected: true,
},
{
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
job: &model.Job{
Type: model.ActionAddIndex,
Args: []any{true},
},
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
unique: true,
expected: true,
},
{
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
job: &model.Job{
Type: model.ActionAddIndex,
Args: []any{false},
},
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
unique: false,
expected: false,
},
}

for _, tc := range testCases2 {
assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, tc.job), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action))
indexArgs.IndexArgs[0].Unique = tc.unique
indexArgs.IndexArgs[0].IsPK = tc.action == model.ActionAddPrimaryKey
for _, ver := range []model.JobVersion{model.JobVersion1, model.JobVersion2} {
job := &model.Job{
Version: ver,
Type: tc.action,
}
job.FillArgs(indexArgs)
job.Encode(true)
args, err := model.GetModifyIndexArgs(job)
require.NoError(t, err)
assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, args), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action))
}
}
}
35 changes: 17 additions & 18 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,30 +342,25 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)"))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
ifExists := make([]bool, 1)
isGlobal := make([]bool, 0, 1)
var partitionIDs []int64
if err := job.DecodeArgs(&allIndexIDs[0], &ifExists[0], &partitionIDs); err != nil {
if err = job.DecodeArgs(&allIndexIDs, &ifExists, &partitionIDs, &isGlobal); err != nil {
return errors.Trace(err)
}
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
return errors.Trace(err)
}
// Determine the physicalIDs to be added.
physicalIDs := []int64{job.TableID}
if len(partitionIDs) > 0 {
physicalIDs = partitionIDs
if len(args.PartitionIDs) > 0 {
physicalIDs = args.PartitionIDs
}
for i, indexID := range allIndexIDs {
for _, indexArg := range args.IndexArgs {
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | indexID
tempIdxID := tablecodec.TempIndexPrefix | indexArg.IndexID
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
indexIDs = []int64{indexArg.IndexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}
if len(isGlobal) != 0 && isGlobal[i] {
if indexArg.IsGlobal {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "add index: physical table ID(s)"); err != nil {
return errors.Trace(err)
}
Expand All @@ -378,22 +373,26 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
_, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs()
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
return errors.Trace(err)
}

tableID := job.TableID
partitionIDs := args.PartitionIDs
indexIDs := []int64{args.IndexArgs[0].IndexID}

// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
if len(partitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, allIndexIDs, ea, "drop index: table ID"))
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, indexIDs, ea, "drop index: table ID"))
}
failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) {
if val.(bool) {
panic("drop global index must not delete partition index range")
}
})
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, allIndexIDs, ea, "drop index: partition table ID"); err != nil {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, indexIDs, ea, "drop index: partition table ID"); err != nil {
return errors.Trace(err)
}
}
Expand Down
Loading

0 comments on commit 89a5571

Please sign in to comment.