Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: args v2 for add/drop/rename index, include PK/vector-index #56130

Merged
merged 39 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
5cc608a
Add args for add/drop index/primary key
joechenrh Sep 12, 2024
8518713
Update
joechenrh Sep 14, 2024
265db55
Update code
joechenrh Sep 14, 2024
ea07abe
Update code
joechenrh Sep 14, 2024
b475261
Fix test
joechenrh Sep 14, 2024
bc0b14f
Merge branch 'master' into addargs-indexes
joechenrh Sep 18, 2024
2a28718
Fix bazel and add tests
joechenrh Sep 19, 2024
3ffe864
Fix test
joechenrh Sep 19, 2024
c97dd4e
Update decode for v1
joechenrh Sep 20, 2024
9ca24d4
Merge branch 'master' into addargs-indexes
joechenrh Sep 20, 2024
bd046aa
Fix test
joechenrh Sep 20, 2024
dc1642c
Add some comments
joechenrh Sep 20, 2024
cdd9d74
Some refactor
joechenrh Sep 23, 2024
69fffec
Add argument for rename index
joechenrh Sep 23, 2024
a649d2d
Merge branch 'master' into addargs-indexes
joechenrh Sep 29, 2024
3134c5d
Merge vector index
joechenrh Oct 8, 2024
024e2bd
Update
joechenrh Oct 8, 2024
240aea6
Update
joechenrh Oct 8, 2024
2cacf44
Update
joechenrh Oct 8, 2024
3962b53
Add tests for vector index args
joechenrh Oct 8, 2024
1e657b8
Change version of subjobs
joechenrh Oct 8, 2024
e559cb8
Fix bdr bug
joechenrh Oct 8, 2024
38c092c
Fix bdr bug
joechenrh Oct 8, 2024
871bbb3
Fix mysql test
joechenrh Oct 8, 2024
fffe244
Merge branch 'master' into addargs-indexes
joechenrh Oct 9, 2024
a0fec13
Fix after merge
joechenrh Oct 9, 2024
b6bfc48
Fix build
joechenrh Oct 9, 2024
124143a
Address comments
joechenrh Oct 9, 2024
dcc8a59
Fix
joechenrh Oct 9, 2024
436bcf5
Add more comments
joechenrh Oct 10, 2024
d9711a8
Address comments
joechenrh Oct 10, 2024
f8e70e6
Fix bug
joechenrh Oct 10, 2024
7b2bb76
Address comment
joechenrh Oct 10, 2024
6ad4cfd
Remove IsFinishedArg flag
joechenrh Oct 10, 2024
92df5ab
Address comments
joechenrh Oct 10, 2024
cff4182
Merge three args into one
joechenrh Oct 11, 2024
e1177cf
Fix build
joechenrh Oct 11, 2024
fd08cef
Update argument encoding and decoding.
joechenrh Oct 11, 2024
d870599
Address comments
joechenrh Oct 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 5 additions & 11 deletions br/pkg/restore/ingestrec/ingest_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,9 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {
return nil
}

allIndexIDs := make([]int64, 1)
// The job.Args is either `Multi-index: ([]int64, ...)`,
// or `Single-index: (int64, ...)`.
// TODO: it's better to use the public function to parse the
// job's Args.
if err := job.DecodeArgs(&allIndexIDs[0]); err != nil {
if err = job.DecodeArgs(&allIndexIDs); err != nil {
return errors.Trace(err)
}
args, err := model.GetFinishedAddIndexArgs(job)
if err != nil {
return errors.Trace(err)
}

tableindexes, exists := i.items[job.TableID]
Expand All @@ -113,8 +107,8 @@ func (i *IngestRecorder) TryAddJob(job *model.Job, isSubJob bool) error {

// the current information of table/index might be modified by other ddl jobs,
// therefore update the index information at last
for _, indexID := range allIndexIDs {
tableindexes[indexID] = &IngestIndexInfo{
for _, a := range args.IndexArgs {
tableindexes[a.IndexID] = &IngestIndexInfo{
IsPrimary: job.Type == model.ActionAddPrimaryKey,
Updated: false,
}
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand All @@ -256,7 +256,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand All @@ -277,7 +277,7 @@ func TestAddIngestRecorder(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), true)
require.NoError(t, err)
f, cnt := hasOneItem(1, "%n,%n", []any{"x", "y"})
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestIndexesKind(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
err = recorder.UpdateIndexInfo(infoSchema)
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestRewriteTableID(t *testing.T) {
[]*model.IndexInfo{
getIndex(1, []string{"x", "y"}),
},
json.RawMessage(`[1, "a"]`),
json.RawMessage(`[1, false, [], false]`),
), false)
require.NoError(t, err)
err = recorder.UpdateIndexInfo(infoSchema)
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,6 @@ go_test(
"index_change_test.go",
"index_cop_test.go",
"index_modify_test.go",
"index_test.go",
"integration_test.go",
"job_scheduler_test.go",
"job_scheduler_testkit_test.go",
Expand Down
14 changes: 4 additions & 10 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,13 @@ func (s *backfillDistExecutor) getBackendCtx() (ingest.BackendCtx, error) {
}

func hasUniqueIndex(job *model.Job) (bool, error) {
var unique bool
err := job.DecodeArgs(&unique)
if err == nil {
return unique, nil
}

var uniques []bool
err = job.DecodeArgs(&uniques)
args, err := model.GetAddIndexArgs(job)
if err != nil {
return false, errors.Trace(err)
}
for _, b := range uniques {
if b {

for _, a := range args.IndexArgs {
if a.Unique {
return true, nil
}
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/bdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func deniedByBDRWhenModifyColumn(newFieldType, oldFieldType types.FieldType, opt
}

// DeniedByBDR checks whether the DDL is denied by BDR.
func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (denied bool) {
func DeniedByBDR(role ast.BDRRole, action model.ActionType, args model.JobArgs) (denied bool) {
ddlType, ok := model.ActionBDRMap[action]
switch role {
case ast.BDRRolePrimary:
Expand All @@ -94,10 +94,10 @@ func DeniedByBDR(role ast.BDRRole, action model.ActionType, job *model.Job) (den
}

// Can't add unique index on primary role.
if job != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) &&
len(job.Args) >= 1 && job.Args[0].(bool) {
// job.Args[0] is unique when job.Type is ActionAddIndex or ActionAddPrimaryKey.
return true
if args != nil && (action == model.ActionAddIndex || action == model.ActionAddPrimaryKey) {
if args.(*model.AddIndexArgs).IndexArgs[0].Unique {
return true
}
}

if ddlType == model.SafeDDL || ddlType == model.UnmanagementDDL {
Expand Down
58 changes: 38 additions & 20 deletions pkg/ddl/bdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/types"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -475,42 +476,59 @@ func TestDeniedByBDR(t *testing.T) {
}

// test special cases
indexArgs := &model.AddIndexArgs{
IndexArgs: []*model.IndexArg{{
Global: false,
IndexName: pmodel.NewCIStr("idx1"),
IndexPartSpecifications: []*ast.IndexPartSpecification{{Length: 2}},
IndexOption: &ast.IndexOption{},
HiddenCols: nil,
SQLMode: mysql.ModeANSI,
IndexID: 1,
IfExist: false,
IsGlobal: false,
}},
}

testCases2 := []struct {
role ast.BDRRole
action model.ActionType
job *model.Job
unique bool
expected bool
}{
{
role: ast.BDRRolePrimary,
action: model.ActionAddPrimaryKey,
job: &model.Job{
Type: model.ActionAddPrimaryKey,
Args: []any{true},
},
role: ast.BDRRolePrimary,
action: model.ActionAddPrimaryKey,
unique: true,
expected: true,
},
{
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
job: &model.Job{
Type: model.ActionAddIndex,
Args: []any{true},
},
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
unique: true,
expected: true,
},
{
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
job: &model.Job{
Type: model.ActionAddIndex,
Args: []any{false},
},
role: ast.BDRRolePrimary,
action: model.ActionAddIndex,
unique: false,
expected: false,
},
}

for _, tc := range testCases2 {
assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, tc.job), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action))
indexArgs.IndexArgs[0].Unique = tc.unique
indexArgs.IndexArgs[0].IsPK = tc.action == model.ActionAddPrimaryKey
for _, ver := range []model.JobVersion{model.JobVersion1, model.JobVersion2} {
job := &model.Job{
Version: ver,
Type: tc.action,
}
job.FillArgs(indexArgs)
job.Encode(true)
args, err := model.GetAddIndexArgs(job)
require.NoError(t, err)
assert.Equal(t, tc.expected, DeniedByBDR(tc.role, tc.action, args), fmt.Sprintf("role: %v, action: %v", tc.role, tc.action))
}
}
}
34 changes: 16 additions & 18 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,30 +342,25 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)"))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
ifExists := make([]bool, 1)
isGlobal := make([]bool, 0, 1)
var partitionIDs []int64
if err := job.DecodeArgs(&allIndexIDs[0], &ifExists[0], &partitionIDs); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first decode is useless, since the first two elements are slice after #47135.
So in the new logic, we only decode them using slice.

if err = job.DecodeArgs(&allIndexIDs, &ifExists, &partitionIDs, &isGlobal); err != nil {
return errors.Trace(err)
}
args, err := model.GetFinishedAddIndexArgs(job)
if err != nil {
return errors.Trace(err)
}
// Determine the physicalIDs to be added.
physicalIDs := []int64{job.TableID}
if len(partitionIDs) > 0 {
physicalIDs = partitionIDs
if len(args.PartitionIDs) > 0 {
physicalIDs = args.PartitionIDs
}
for i, indexID := range allIndexIDs {
for _, indexArg := range args.IndexArgs {
// Determine the index IDs to be added.
tempIdxID := tablecodec.TempIndexPrefix | indexID
tempIdxID := tablecodec.TempIndexPrefix | indexArg.IndexID
var indexIDs []int64
if job.State == model.JobStateRollbackDone {
indexIDs = []int64{indexID, tempIdxID}
indexIDs = []int64{indexArg.IndexID, tempIdxID}
} else {
indexIDs = []int64{tempIdxID}
}
if len(isGlobal) != 0 && isGlobal[i] {
if indexArg.IsGlobal {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, job.TableID, indexIDs, ea, "add index: physical table ID(s)"); err != nil {
return errors.Trace(err)
}
Expand All @@ -378,22 +373,25 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
}
}
case model.ActionDropIndex, model.ActionDropPrimaryKey:
tableID := job.TableID
_, _, allIndexIDs, partitionIDs, _, err := job.DecodeDropIndexFinishedArgs()
args, err := model.GetFinishedDropIndexArgs(job)
if err != nil {
return errors.Trace(err)
}

tableID := job.TableID
partitionIDs := args.PartitionIDs

// partitionIDs len is 0 if the dropped index is a global index, even if it is a partitioned table.
if len(partitionIDs) == 0 {
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, allIndexIDs, ea, "drop index: table ID"))
return errors.Trace(doBatchDeleteIndiceRange(ctx, wrapper, job.ID, tableID, args.IndexIDs, ea, "drop index: table ID"))
}
failpoint.Inject("checkDropGlobalIndex", func(val failpoint.Value) {
if val.(bool) {
panic("drop global index must not delete partition index range")
}
})
for _, pid := range partitionIDs {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, allIndexIDs, ea, "drop index: partition table ID"); err != nil {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, pid, args.IndexIDs, ea, "drop index: partition table ID"); err != nil {
return errors.Trace(err)
}
}
Expand Down
Loading