diff --git a/pkg/ddl/multi_schema_change.go b/pkg/ddl/multi_schema_change.go index 593daaf6eb7c4..3e063822e585b 100644 --- a/pkg/ddl/multi_schema_change.go +++ b/pkg/ddl/multi_schema_change.go @@ -184,6 +184,7 @@ func appendToSubJobs(m *model.MultiSchemaInfo, job *model.Job) error { reorgTp = job.ReorgMeta.ReorgTp } m.SubJobs = append(m.SubJobs, &model.SubJob{ + Version: job.Version, Type: job.Type, Args: job.Args, RawArgs: job.RawArgs, diff --git a/pkg/ddl/reorg_partition_test.go b/pkg/ddl/reorg_partition_test.go index 1287b304a95d0..36c7cd16012b4 100644 --- a/pkg/ddl/reorg_partition_test.go +++ b/pkg/ddl/reorg_partition_test.go @@ -524,7 +524,7 @@ func TestReorgPartitionRollback(t *testing.T) { // TODO: Check that there are no additional placement rules, // bundles, or ranges with non-completed tableIDs // (partitions used during reorg, but was dropped) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(2)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`)) tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))") tk.MustExec(`admin check table t`) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr")) diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index 87b1ec8fdce80..1935f39d5e44e 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -490,49 +490,44 @@ func (job *Job) FillFinishedArgs(args FinishedJobArgs) { args.fillFinishedJob(job) } +func marshalArgs(jobVer JobVersion, args []any) (json.RawMessage, error) { + if len(args) <= 0 { + return nil, nil + } + + if jobVer == JobVersion1 { + rawArgs, err := json.Marshal(args) + if err != nil { + return nil, errors.Trace(err) + } + return rawArgs, nil + } + + intest.Assert(jobVer == JobVersion2, "job version is not v2") + intest.Assert(len(args) == 1, "Job.Args should have only one element") + rawArgs, err := json.Marshal(args[0]) + if err != nil { + return nil, errors.Trace(err) + } + return rawArgs, nil +} + // Encode encodes job with json format. // updateRawArgs is used to determine whether to update the raw args. func (job *Job) Encode(updateRawArgs bool) ([]byte, error) { var err error if updateRawArgs { - if job.Version == JobVersion1 { - job.RawArgs, err = json.Marshal(job.Args) - if err != nil { - return nil, errors.Trace(err) - } - if job.MultiSchemaInfo != nil { - for _, sub := range job.MultiSchemaInfo.SubJobs { - // Only update the args of executing sub-jobs. - if sub.Args == nil { - continue - } - sub.RawArgs, err = json.Marshal(sub.Args) - if err != nil { - return nil, errors.Trace(err) - } - } - } - } else { - var arg any - if len(job.Args) > 0 { - intest.Assert(len(job.Args) == 1, "Job.Args should have only one element") - arg = job.Args[0] - } - job.RawArgs, err = json.Marshal(arg) - if err != nil { - return nil, errors.Trace(err) - } - // TODO remember update sub-jobs' RawArgs when we do it. - if job.MultiSchemaInfo != nil { - for _, sub := range job.MultiSchemaInfo.SubJobs { - // Only update the args of executing sub-jobs. - if len(sub.Args) <= 0 { - continue - } - sub.RawArgs, err = json.Marshal(sub.Args[0]) - if err != nil { - return nil, errors.Trace(err) - } + job.RawArgs, err = marshalArgs(job.Version, job.Args) + if err != nil { + return nil, errors.Trace(err) + } + + if job.MultiSchemaInfo != nil { + for _, sub := range job.MultiSchemaInfo.SubJobs { + // Only update the args of executing sub-jobs. + sub.RawArgs, err = marshalArgs(sub.Version, sub.Args) + if err != nil { + return nil, errors.Trace(err) } } } @@ -873,6 +868,7 @@ func (job *Job) GetInvolvingSchemaInfo() []InvolvingSchemaInfo { // SubJob is a representation of one DDL schema change. A Job may contain zero // (when multi-schema change is not applicable) or more SubJobs. type SubJob struct { + Version JobVersion `json:"version"` Type ActionType `json:"type"` Args []any `json:"-"` RawArgs json.RawMessage `json:"raw_args"` @@ -909,6 +905,17 @@ func (sub *SubJob) IsFinished() bool { // ToProxyJob converts a sub-job to a proxy job. func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { + var jobVer JobVersion + // because in mock test case. the version = V1 in ActionMultiSchemaChange, but maybe the version = v2 in subjob. + // we should retain the version from subjob. + // to do: + // we should set Version = sub.Version, after refactor all of DDL type. + if sub.Version == JobVersion2 { + jobVer = JobVersion2 + } else { + jobVer = parentJob.Version + } + return Job{ ID: parentJob.ID, Type: sub.Type, @@ -931,7 +938,7 @@ func (sub *SubJob) ToProxyJob(parentJob *Job, seq int) Job { DependencyID: parentJob.DependencyID, Query: parentJob.Query, BinlogInfo: parentJob.BinlogInfo, - Version: parentJob.Version, + Version: jobVer, ReorgMeta: parentJob.ReorgMeta, MultiSchemaInfo: &MultiSchemaInfo{Revertible: sub.Revertible, Seq: int32(seq)}, Priority: parentJob.Priority, diff --git a/pkg/meta/model/job_args.go b/pkg/meta/model/job_args.go index a9e6081ce52d9..cdfceb4991ade 100644 --- a/pkg/meta/model/job_args.go +++ b/pkg/meta/model/job_args.go @@ -380,7 +380,7 @@ func GetDropColumnArgs(job *Job) (*DropColumnArgs, error) { partitionIDs []int64 ) - if job.Version == JobVersion1 { + if job.Version <= JobVersion1 { err := job.DecodeArgs(&colName, &ifExists, &indexIDs, &partitionIDs) if err != nil { return nil, errors.Trace(err)