Skip to content

Commit

Permalink
retain the job version from subJob in ActionMultiSchemaChange
Browse files Browse the repository at this point in the history
Signed-off-by: joccau <[email protected]>
  • Loading branch information
joccau committed Sep 16, 2024
1 parent 82abe40 commit 8bad48a
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 41 deletions.
1 change: 1 addition & 0 deletions pkg/ddl/multi_schema_change.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error {
reorgTp = job.ReorgMeta.ReorgTp
}
m.SubJobs = append(m.SubJobs, &model.SubJob{
Version: job.Version,
Type: job.Type,
Args: job.Args,
RawArgs: job.RawArgs,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/reorg_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func TestReorgPartitionRollback(t *testing.T) {
// TODO: Check that there are no additional placement rules,
// bundles, or ranges with non-completed tableIDs
// (partitions used during reorg, but was dropped)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(2)`))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`))
tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))")
tk.MustExec(`admin check table t`)
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr"))
Expand Down
85 changes: 46 additions & 39 deletions pkg/meta/model/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,49 +490,44 @@ func (job *Job) FillFinishedArgs(args FinishedJobArgs) {
args.fillFinishedJob(job)
}

func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) {
if len(args) <= 0 {
return nil, nil
}

if jobVer == JobVersion1 {
rawArgs, err := json.Marshal(args)
if err != nil {
return nil, errors.Trace(err)
}
return rawArgs, nil
}

intest.Assert(jobVer == JobVersion2, "job version is not v2")
intest.Assert(len(args) == 1, "Job.Args should have only one element")
rawArgs, err := json.Marshal(args[0])
if err != nil {
return nil, errors.Trace(err)
}
return rawArgs, nil
}

// Encode encodes job with json format.
// updateRawArgs is used to determine whether to update the raw args.
func (job *Job) Encode(updateRawArgs bool) ([]byte, error) {
var err error
if updateRawArgs {
if job.Version == JobVersion1 {
job.RawArgs, err = json.Marshal(job.Args)
if err != nil {
return nil, errors.Trace(err)
}
if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if sub.Args == nil {
continue
}
sub.RawArgs, err = json.Marshal(sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
}
}
} else {
var arg any
if len(job.Args) > 0 {
intest.Assert(len(job.Args) == 1, "Job.Args should have only one element")
arg = job.Args[0]
}
job.RawArgs, err = json.Marshal(arg)
if err != nil {
return nil, errors.Trace(err)
}
// TODO remember update sub-jobs' RawArgs when we do it.
if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
if len(sub.Args) <= 0 {
continue
}
sub.RawArgs, err = json.Marshal(sub.Args[0])
if err != nil {
return nil, errors.Trace(err)
}
job.RawArgs, err = marshalArgs(job.Version, job.Args)
if err != nil {
return nil, errors.Trace(err)
}

if job.MultiSchemaInfo != nil {
for _, sub := range job.MultiSchemaInfo.SubJobs {
// Only update the args of executing sub-jobs.
sub.RawArgs, err = marshalArgs(sub.Version, sub.Args)
if err != nil {
return nil, errors.Trace(err)
}
}
}
Expand Down Expand Up @@ -873,6 +868,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo {
// SubJob is a representation of one DDL schema change. A Job may contain zero
// (when multi-schema change is not applicable) or more SubJobs.
type SubJob struct {
Version JobVersion `json:"version"`
Type ActionType `json:"type"`
Args []any `json:"-"`
RawArgs json.RawMessage `json:"raw_args"`
Expand Down Expand Up @@ -909,6 +905,17 @@ func (sub *SubJob) IsFinished() bool {

// ToProxyJob converts a sub-job to a proxy job.
func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job {
var jobVer JobVersion
// because in mock test case. the version = V1 in ActionMultiSchemaChange, but maybe the version = v2 in subjob.
// we should retain the version from subjob.
// to do:
// we should set Version = sub.Version, after refactor all of DDL type.
if sub.Version == JobVersion2 {
jobVer = JobVersion2
} else {
jobVer = parentJob.Version
}

return Job{
ID: parentJob.ID,
Type: sub.Type,
Expand All @@ -931,7 +938,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job {
DependencyID: parentJob.DependencyID,
Query: parentJob.Query,
BinlogInfo: parentJob.BinlogInfo,
Version: parentJob.Version,
Version: jobVer,
ReorgMeta: parentJob.ReorgMeta,
MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible, Seq: int32(seq)},
Priority: parentJob.Priority,
Expand Down
2 changes: 1 addition & 1 deletion pkg/meta/model/job_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func GetDropColumnArgs(job *Job) (*DropColumnArgs, error) {
partitionIDs []int64
)

if job.Version == JobVersion1 {
if job.Version <= JobVersion1 {
err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs)
if err != nil {
return nil, errors.Trace(err)
Expand Down

0 comments on commit 8bad48a

Please sign in to comment.