Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refactor job args for set/update tiflash-replica and alter-table-placement ddl #56241

Merged
merged 17 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 18 additions & 26 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,20 +563,21 @@ func (e *executor) ModifySchemaSetTiFlashReplica(sctx sessionctx.Context, stmt *
}

job := &model.Job{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe in future, we can have a newJob function to avoid the job constructor changes need to be edit at many places

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea~

Version: model.GetJobVerInUse(),
SchemaID: dbInfo.ID,
SchemaName: dbInfo.Name.L,
TableID: tbl.ID,
Type: model.ActionSetTiFlashReplica,
BinlogInfo: &model.HistoryInfo{},
Args: []any{*tiflashReplica},
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{
Database: dbInfo.Name.L,
Table: model.InvolvingAll,
}},
SQLMode: sctx.GetSessionVars().SQLMode,
}
err := e.DoDDLJob(sctx, job)
args := &model.SetTiFlashReplicaArgs{TiflashReplica: *tiflashReplica}
err := e.doDDLJob2(sctx, job, args)
if err != nil {
oneFail = tbl.ID
fail++
Expand Down Expand Up @@ -641,19 +642,21 @@ func (e *executor) AlterTablePlacement(ctx sessionctx.Context, ident ast.Ident,
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tblInfo.ID,
SchemaName: schema.Name.L,
TableName: tblInfo.Name.L,
Type: model.ActionAlterTablePlacement,
BinlogInfo: &model.HistoryInfo{},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
Args: []any{placementPolicyRef},
InvolvingSchemaInfo: involvingSchemaInfo,
SQLMode: ctx.GetSessionVars().SQLMode,
}

err = e.DoDDLJob(ctx, job)
args := &model.AlterTablePlacementArgs{
PlacementPolicyRef: placementPolicyRef,
}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -1991,23 +1994,7 @@ func (e *executor) multiSchemaChange(ctx sessionctx.Context, ti ast.Ident, info
// we need refactor this part to support V2 job version after refactor all of ddl types.
var involvingSchemaInfo []model.InvolvingSchemaInfo
for _, j := range subJobs {
switch j.Type {
case model.ActionAlterTablePlacement:
ref, ok := j.Args[0].(*model.PolicyRefInfo)
if !ok {
logFn("unexpected type of policy reference info",
zap.Any("args[0]", j.Args[0]),
zap.String("type", fmt.Sprintf("%T", j.Args[0])))
continue
}
if ref == nil {
continue
}
involvingSchemaInfo = append(involvingSchemaInfo, model.InvolvingSchemaInfo{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does it move to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the ActionAlterTablePlacement ddl is not belong to multiSchemaChange. It's useless.

Policy: ref.Name.L,
Mode: model.SharedInvolving,
})
case model.ActionAddForeignKey:
if j.Type == model.ActionAddForeignKey {
ref, ok := j.Args[0].(*model.FKInfo)
if !ok {
logFn("unexpected type of foreign key info",
Expand Down Expand Up @@ -3683,17 +3670,18 @@ func (e *executor) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionSetTiFlashReplica,
BinlogInfo: &model.HistoryInfo{},
Args: []any{*replicaInfo},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err = e.DoDDLJob(ctx, job)
args := &model.SetTiFlashReplicaArgs{TiflashReplica: *replicaInfo}
err = e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down Expand Up @@ -3912,17 +3900,21 @@ func (e *executor) UpdateTableReplicaInfo(ctx sessionctx.Context, physicalID int
}

job := &model.Job{
Version: model.GetJobVerInUse(),
SchemaID: db.ID,
TableID: tb.Meta().ID,
SchemaName: db.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionUpdateTiFlashReplicaStatus,
BinlogInfo: &model.HistoryInfo{},
Args: []any{available, physicalID},
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
err := e.DoDDLJob(ctx, job)
args := &model.UpdateTiFlashReplicaStatusArgs{
Available: available,
PhysicalID: physicalID,
}
err := e.doDDLJob2(ctx, job, args)
return errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,7 @@ func (w *worker) runOneJobStep(
case model.ActionSetTiFlashReplica:
ver, err = w.onSetTableFlashReplica(jobCtx, t, job)
case model.ActionUpdateTiFlashReplicaStatus:
ver, err = onUpdateFlashReplicaStatus(jobCtx, t, job)
ver, err = onUpdateTiFlashReplicaStatus(jobCtx, t, job)
case model.ActionCreateSequence:
ver, err = onCreateSequence(jobCtx, t, job)
case model.ActionAlterIndexVisibility:
Expand Down
18 changes: 9 additions & 9 deletions pkg/ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/autoid"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
field_types "github.com/pingcap/tidb/pkg/parser/types"
Expand Down Expand Up @@ -1067,11 +1066,12 @@ func onModifyTableCharsetAndCollate(jobCtx *jobContext, t *meta.Meta, job *model
}

func (w *worker) onSetTableFlashReplica(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var replicaInfo ast.TiFlashReplicaSpec
if err := job.DecodeArgs(&replicaInfo); err != nil {
args, err := model.GetSetTiFlashReplicaArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
replicaInfo := args.TiflashReplica

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
Expand Down Expand Up @@ -1146,13 +1146,13 @@ func (w *worker) checkTiFlashReplicaCount(replicaCount uint64) error {
return checkTiFlashReplicaCount(ctx, replicaCount)
}

func onUpdateFlashReplicaStatus(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
var available bool
var physicalID int64
if err := job.DecodeArgs(&available, &physicalID); err != nil {
func onUpdateTiFlashReplicaStatus(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
args, err := model.GetUpdateTiFlashReplicaStatusArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
available, physicalID := args.Available, args.PhysicalID

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
Expand Down Expand Up @@ -1515,12 +1515,12 @@ func onAlterTablePartitionPlacement(jobCtx *jobContext, t *meta.Meta, job *model
}

func onAlterTablePlacement(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) {
policyRefInfo := &model.PolicyRefInfo{}
err = job.DecodeArgs(&policyRefInfo)
args, err := model.GetAlterTablePlacementArgs(job)
if err != nil {
job.State = model.JobStateCancelled
return 0, errors.Trace(err)
}
policyRefInfo := args.PlacementPolicyRef

tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
Expand Down
88 changes: 88 additions & 0 deletions pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,94 @@ func GetAddCheckConstraintArgs(job *Job) (*AddCheckConstraintArgs, error) {
return getOrDecodeArgsV2[*AddCheckConstraintArgs](job)
}

// AlterTablePlacementArgs is the arguments for alter table placements ddl job.
type AlterTablePlacementArgs struct {
PlacementPolicyRef *PolicyRefInfo `json:"placement_policy_ref,omitempty"`
}

func (a *AlterTablePlacementArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.PlacementPolicyRef}
} else {
job.Args = []any{a}
}
}

// GetAlterTablePlacementArgs gets the args for alter table placements ddl job.
func GetAlterTablePlacementArgs(job *Job) (*AlterTablePlacementArgs, error) {
if job.Version == JobVersion1 {
// when the target policy is 'default', policy info is nil
var placementPolicyRef *PolicyRefInfo
if err := job.DecodeArgs(&placementPolicyRef); err != nil {
D3Hunter marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.Trace(err)
}
return &AlterTablePlacementArgs{
PlacementPolicyRef: placementPolicyRef,
}, nil
}

return getOrDecodeArgsV2[*AlterTablePlacementArgs](job)
}

// SetTiFlashReplicaArgs is the arguments for setting TiFlash replica ddl.
type SetTiFlashReplicaArgs struct {
TiflashReplica ast.TiFlashReplicaSpec `json:"tiflash_replica,omitempty"`
}

func (a *SetTiFlashReplicaArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.TiflashReplica}
} else {
job.Args = []any{a}
}
}

// GetSetTiFlashReplicaArgs gets the args for setting TiFlash replica ddl.
func GetSetTiFlashReplicaArgs(job *Job) (*SetTiFlashReplicaArgs, error) {
if job.Version == JobVersion1 {
tiflashReplica := ast.TiFlashReplicaSpec{}
if err := job.DecodeArgs(&tiflashReplica); err != nil {
return nil, errors.Trace(err)
}
return &SetTiFlashReplicaArgs{TiflashReplica: tiflashReplica}, nil
}

return getOrDecodeArgsV2[*SetTiFlashReplicaArgs](job)
}

// UpdateTiFlashReplicaStatusArgs is the arguments for updating TiFlash replica status ddl.
type UpdateTiFlashReplicaStatusArgs struct {
Available bool `json:"available,omitempty"`
PhysicalID int64 `json:"physical_id,omitempty"`
}

func (a *UpdateTiFlashReplicaStatusArgs) fillJob(job *Job) {
if job.Version == JobVersion1 {
job.Args = []any{a.Available, a.PhysicalID}
} else {
job.Args = []any{a}
}
}

// GetUpdateTiFlashReplicaStatusArgs gets the args for updating TiFlash replica status ddl.
func GetUpdateTiFlashReplicaStatusArgs(job *Job) (*UpdateTiFlashReplicaStatusArgs, error) {
if job.Version == JobVersion1 {
var (
available bool
physicalID int64
)
if err := job.DecodeArgs(&available, &physicalID); err != nil {
return nil, errors.Trace(err)
}
return &UpdateTiFlashReplicaStatusArgs{
Available: available,
PhysicalID: physicalID,
}, nil
}

return getOrDecodeArgsV2[*UpdateTiFlashReplicaStatusArgs](job)
}

// LockTablesArgs is the argument for LockTables.
type LockTablesArgs struct {
LockTables []TableLockTpInfo `json:"lock_tables,omitempty"`
Expand Down
58 changes: 58 additions & 0 deletions pkg/meta/model/job_args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,64 @@ func TestCheckConstraintArgs(t *testing.T) {
}
}

func TestGetAlterTablePlacementArgs(t *testing.T) {
inArgs := &AlterTablePlacementArgs{
PlacementPolicyRef: &PolicyRefInfo{
ID: 7527,
Name: model.NewCIStr("placement-policy"),
},
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAlterTablePlacement)))
args, err := GetAlterTablePlacementArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}

// test PlacementPolicyRef is nil
inArgs = &AlterTablePlacementArgs{
PlacementPolicyRef: nil,
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionAlterTablePlacement)))
args, err := GetAlterTablePlacementArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}

func TestGetSetTiFlashReplicaArgs(t *testing.T) {
inArgs := &SetTiFlashReplicaArgs{
TiflashReplica: ast.TiFlashReplicaSpec{
Count: 3,
Labels: []string{"TiFlash1", "TiFlash2", "TiFlash3"},
Hypo: true,
},
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionSetTiFlashReplica)))
args, err := GetSetTiFlashReplicaArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}

func TestGetUpdateTiFlashReplicaStatusArgs(t *testing.T) {
inArgs := &UpdateTiFlashReplicaStatusArgs{
Available: true,
PhysicalID: 1001,
}
for _, v := range []JobVersion{JobVersion1, JobVersion2} {
j2 := &Job{}
require.NoError(t, j2.Decode(getJobBytes(t, inArgs, v, ActionUpdateTiFlashReplicaStatus)))
args, err := GetUpdateTiFlashReplicaStatusArgs(j2)
require.NoError(t, err)
require.Equal(t, inArgs, args)
}
}
func TestLockTableArgs(t *testing.T) {
inArgs := &LockTablesArgs{
LockTables: []TableLockTpInfo{{1, 1, model.TableLockNone}},
Expand Down