Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] *: Move updateRawArgs into Job and SubJob #56351

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/stream/rewrite_meta_rawkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ var (

func genFinishedJob(job *model.Job, args model.FinishedJobArgs) *model.Job {
job.FillFinishedArgs(args)
bytes, _ := job.Encode(true)
bytes, _ := job.Encode()
resJob := &model.Job{}
_ = resJob.Decode(bytes)
return resJob
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ func processJobs(
continue
}

err = updateDDLJob2Table(ctx, ns, job, false)
err = updateDDLJob2Table(ctx, ns, job)
if err != nil {
jobErrs[i] = err
continue
Expand Down Expand Up @@ -1457,7 +1457,7 @@ func processAllJobs(
continue
}

err = updateDDLJob2Table(ctx, ns, job, false)
err = updateDDLJob2Table(ctx, ns, job)
if err != nil {
jobErrs[job.ID] = err
continue
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/ddl_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ const (
)

// AddHistoryDDLJob record the history job.
func AddHistoryDDLJob(ctx context.Context, sess *sess.Session, t *meta.Mutator, job *model.Job, updateRawArgs bool) error {
err := addHistoryDDLJob2Table(ctx, sess, job, updateRawArgs)
func AddHistoryDDLJob(ctx context.Context, sess *sess.Session, t *meta.Mutator, job *model.Job) error {
err := addHistoryDDLJob2Table(ctx, sess, job)
if err != nil {
logutil.DDLLogger().Info("failed to add DDL job to history table", zap.Error(err))
}
// we always add history DDL job to job list at this moment.
return t.AddHistoryDDLJob(job, updateRawArgs)
return t.AddHistoryDDLJob(job)
}

// addHistoryDDLJob2Table adds DDL job to history table.
func addHistoryDDLJob2Table(ctx context.Context, sess *sess.Session, job *model.Job, updateRawArgs bool) error {
b, err := job.Encode(updateRawArgs)
func addHistoryDDLJob2Table(ctx context.Context, sess *sess.Session, job *model.Job) error {
b, err := job.Encode()
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ddl/ddl_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestDDLHistoryBasic(t *testing.T) {
t := meta.NewMutator(txn)
return ddl.AddHistoryDDLJob(context.Background(), sess, t, &model.Job{
ID: 1,
}, false)
})
})

require.NoError(t, err)
Expand All @@ -62,7 +62,7 @@ func TestDDLHistoryBasic(t *testing.T) {
t := meta.NewMutator(txn)
return ddl.AddHistoryDDLJob(context.Background(), sess, t, &model.Job{
ID: 2,
}, false)
})
})

require.NoError(t, err)
Expand Down
9 changes: 9 additions & 0 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3402,6 +3402,8 @@ func (e *executor) RenameColumn(ctx sessionctx.Context, ident ast.Ident, spec *a
CDCWriteSource: ctx.GetSessionVars().CDCWriteSource,
SQLMode: ctx.GetSessionVars().SQLMode,
}
// TODO(joechenrh): remove this later
job.UpdateRawArgs = true
err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand Down Expand Up @@ -4029,6 +4031,8 @@ func (e *executor) RenameIndex(ctx sessionctx.Context, ident ast.Ident, spec *as
SQLMode: ctx.GetSessionVars().SQLMode,
}

// TODO(joechenrh): remove this later
job.UpdateRawArgs = true
err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand Down Expand Up @@ -4598,6 +4602,9 @@ func (e *executor) CreatePrimaryKey(ctx sessionctx.Context, ti ast.Ident, indexN
}
job.ReorgMeta = reorgMeta

// TODO(joechenrh): remove this later
job.UpdateRawArgs = true

err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand Down Expand Up @@ -5222,6 +5229,8 @@ func (e *executor) dropIndex(ctx sessionctx.Context, ti ast.Ident, indexName pmo
SQLMode: ctx.GetSessionVars().SQLMode,
}

// TODO(joechenrh): remove this later
job.UpdateRawArgs = true
err = e.DoDDLJob(ctx, job)
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,9 @@ func (w *worker) onCreateIndex(jobCtx *jobContext, job *model.Job, isPK bool) (v
return ver, errors.Trace(err)
}

// TODO(joechenrh): remove this later
job.UpdateRawArgs = true

allIndexInfos := make([]*model.IndexInfo, 0, len(indexNames))
for i, indexName := range indexNames {
indexInfo, err := checkAndBuildIndexInfo(job, tblInfo, indexName, isPK, uniques[i], false, indexPartSpecifications[i], indexOption[i], hiddenCols[i])
Expand Down Expand Up @@ -1118,6 +1121,8 @@ SwitchIndexState:
isGlobal = append(isGlobal, indexInfo.Global)
}
job.Args = []any{allIndexIDs, ifExists, getPartitionIDs(tbl.Meta()), isGlobal}
// TODO(joechenrh): remove this later
job.UpdateRawArgs = true
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
if !job.ReorgMeta.IsDistReorg && job.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
Expand Down Expand Up @@ -1477,6 +1482,8 @@ func onDropIndex(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
job.Args = append(job.Args, idxIDs[0], getPartitionIDs(tblInfo), isVector)
}
}
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
default:
return ver, errors.Trace(dbterror.ErrInvalidDDLState.GenWithStackByArgs("index", allIndexInfos[0].State))
}
Expand Down Expand Up @@ -1536,6 +1543,9 @@ func checkDropIndex(infoCache *infoschema.InfoCache, t *meta.Mutator, job *model
}
}

// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true

indexInfos := make([]*model.IndexInfo, 0, len(indexNames))
for i, idxName := range indexNames {
indexInfo := tblInfo.FindIndexByName(idxName.L)
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,9 +660,8 @@ func updateDDLJob2Table(
ctx context.Context,
se *sess.Session,
job *model.Job,
updateRawArgs bool,
) error {
b, err := job.Encode(updateRawArgs)
b, err := job.Encode()
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ddl/job_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,10 @@ func insertDDLJobs2Table(ctx context.Context, se *sess.Session, jobWs ...*JobWra
if jobW.JobArgs != nil {
jobW.FillArgs(jobW.JobArgs)
}
// TODO(joechenrh): remove this later.
jobW.UpdateRawArgs = true
injectModifyJobArgFailPoint(jobWs)
b, err := jobW.Encode(true)
b, err := jobW.Encode()
if err != nil {
return err
}
Expand Down Expand Up @@ -785,6 +787,7 @@ func injectModifyJobArgFailPoint(jobWs []*JobWrapper) {
}
} else if len(job.Args) > 0 {
job.Args[0] = 1
job.UpdateRawArgs = true
}
}
}
Expand Down
47 changes: 11 additions & 36 deletions pkg/ddl/job_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,14 @@ func (w *worker) handleUpdateJobError(jobCtx *jobContext, job *model.Job, err er
}

// updateDDLJob updates the DDL job information.
func (w *worker) updateDDLJob(jobCtx *jobContext, job *model.Job, updateRawArgs bool) error {
func (w *worker) updateDDLJob(_ *jobContext, job *model.Job) error {
failpoint.Inject("mockErrEntrySizeTooLarge", func(val failpoint.Value) {
if val.(bool) {
failpoint.Return(kv.ErrEntryTooLarge)
}
})

if !updateRawArgs {
jobCtx.logger.Info("meet something wrong before update DDL job, shouldn't update raw args",
zap.String("job", job.String()))
}
return errors.Trace(updateDDLJob2Table(w.ctx, w.sess, job, updateRawArgs))
return errors.Trace(updateDDLJob2Table(w.ctx, w.sess, job))
}

// registerMDLInfo registers metadata lock info.
Expand Down Expand Up @@ -363,6 +359,7 @@ func (w *worker) finishDDLJob(jobCtx *jobContext, job *model.Job) (err error) {
// it may be too large that it can not be added to the history queue, so
// delete its arguments
job.Args = nil
job.UpdateRawArgs = true
}
}
if err != nil {
Expand All @@ -376,16 +373,11 @@ func (w *worker) finishDDLJob(jobCtx *jobContext, job *model.Job) (err error) {
metaMut := jobCtx.metaMut
job.BinlogInfo.FinishedTS = metaMut.StartTS
jobCtx.logger.Info("finish DDL job", zap.String("job", job.String()))
updateRawArgs := true
if job.Type == model.ActionAddPrimaryKey && !job.IsCancelled() {
// ActionAddPrimaryKey needs to check the warnings information in job.Args.
// Notice: warnings is used to support non-strict mode.
updateRawArgs = false
}

job.SeqNum = w.seqAllocator.Add(1)
w.removeJobCtx(job)
failpoint.InjectCall("afterFinishDDLJob", job)
err = AddHistoryDDLJob(w.ctx, w.sess, metaMut, job, updateRawArgs)
err = AddHistoryDDLJob(w.ctx, w.sess, metaMut, job)
return errors.Trace(err)
}

Expand Down Expand Up @@ -551,7 +543,7 @@ func (w *worker) transitOneJobStep(jobCtx *jobContext, job *model.Job) (int64, e

// If running job meets error, we will save this error in job Error and retry
// later if the job is not cancelled.
schemaVer, updateRawArgs, runJobErr := w.runOneJobStep(jobCtx, job)
schemaVer, runJobErr := w.runOneJobStep(jobCtx, job)

failpoint.InjectCall("onJobRunAfter", job)

Expand Down Expand Up @@ -585,7 +577,7 @@ func (w *worker) transitOneJobStep(jobCtx *jobContext, job *model.Job) (int64, e
jobCtx.unlockSchemaVersion(job.ID)
return 0, err
}
err = w.updateDDLJob(jobCtx, job, updateRawArgs)
err = w.updateDDLJob(jobCtx, job)
if err = w.handleUpdateJobError(jobCtx, job, err); err != nil {
w.sess.Rollback()
jobCtx.unlockSchemaVersion(job.ID)
Expand Down Expand Up @@ -750,7 +742,7 @@ func (*worker) processJobPausingRequest(jobCtx *jobContext, job *model.Job) (isR
func (w *worker) runOneJobStep(
jobCtx *jobContext,
job *model.Job,
) (ver int64, updateRawArgs bool, err error) {
) (ver int64, err error) {
defer tidbutil.Recover(metrics.LabelDDLWorker, fmt.Sprintf("%s runOneJobStep", w),
func() {
w.countForPanic(jobCtx, job)
Expand All @@ -772,25 +764,19 @@ func (w *worker) runOneJobStep(

if job.IsCancelling() {
jobCtx.logger.Debug("cancel DDL job", zap.String("job", job.String()))
ver, err = convertJob2RollbackJob(w, jobCtx, job)
// if job is converted to rollback job, the job.Args may be changed for the
// rollback logic, so we let caller persist the new arguments.
updateRawArgs = job.IsRollingback()
return
return convertJob2RollbackJob(w, jobCtx, job)
}

isRunnable, err := w.processJobPausingRequest(jobCtx, job)
if !isRunnable {
return ver, false, err
return ver, err
}

// It would be better to do the positive check, but no idea to list all valid states here now.
if !job.IsRollingback() {
job.State = model.JobStateRunning
}

prevState := job.State

// For every type, `schema/table` modification and `job` modification are conducted
// in the one kv transaction. The `schema/table` modification can be always discarded
// by kv reset when meets an unhandled error, but the `job` modification can't.
Expand Down Expand Up @@ -928,22 +914,11 @@ func (w *worker) runOneJobStep(
err = dbterror.ErrInvalidDDLJob.GenWithStack("invalid ddl job type: %v", job.Type)
}

// there are too many job types, instead let every job type output its own
// updateRawArgs, we try to use these rules as a generalization:
//
// if job has no error, some arguments may be changed, there's no harm to update
// it.
updateRawArgs = err == nil
// if job changed from running to rolling back, arguments may be changed
if prevState == model.JobStateRunning && job.IsRollingback() {
updateRawArgs = true
}

// Save errors in job if any, so that others can know errors happened.
if err != nil {
err = w.countForError(jobCtx, job, err)
}
return ver, updateRawArgs, err
return ver, err
}

func loadDDLVars(w *worker) error {
Expand Down
13 changes: 13 additions & 0 deletions pkg/ddl/modify_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func rollbackModifyColumnJob(jobCtx *jobContext, tblInfo *model.TableInfo, job *
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
// For those column-type-change type which doesn't need reorg data, we should also mock the job args for delete range.
job.Args = []any{[]int64{}, []int64{}}
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
return ver, nil
}

Expand Down Expand Up @@ -283,6 +285,8 @@ func rollbackModifyColumnJobWithData(jobCtx *jobContext, tblInfo *model.TableInf
job.FinishTableJob(model.JobStateRollbackDone, model.StateNone, ver, tblInfo)
// Reconstruct the job args to add the temporary index ids into delete range table.
job.Args = []any{changingIdxIDs, getPartitionIDs(tblInfo)}
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
return ver, nil
}

Expand Down Expand Up @@ -342,6 +346,8 @@ func (w *worker) doModifyColumn(
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
// For those column-type-change type which doesn't need reorg data, we should also mock the job args for delete range.
job.Args = []any{[]int64{}, []int64{}}
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
return ver, nil
}

Expand Down Expand Up @@ -480,6 +486,8 @@ func (w *worker) doModifyColumnTypeWithData(
job.SchemaState = model.StateDeleteOnly
metrics.GetBackfillProgressByLabel(metrics.LblModifyColumn, job.SchemaName, tblInfo.Name.String()).Set(0)
job.Args = append(job.Args, changingCol, changingIdxs, rmIdxIDs)
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
case model.StateDeleteOnly:
// Column from null to not null.
if !mysql.HasNotNullFlag(oldCol.GetFlag()) && mysql.HasNotNullFlag(changingCol.GetFlag()) {
Expand Down Expand Up @@ -544,6 +552,8 @@ func (w *worker) doModifyColumnTypeWithData(
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
// Refactor the job args to add the old index ids into delete range table.
job.Args = []any{rmIdxIDs, getPartitionIDs(tblInfo)}
// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
modifyColumnEvent := notifier.NewModifyColumnEvent(tblInfo, []*model.ColumnInfo{changingCol})
asyncNotifyEvent(jobCtx, modifyColumnEvent, job)
default:
Expand Down Expand Up @@ -940,6 +950,9 @@ func GetModifiableColumnJob(
CDCWriteSource: sctx.GetSessionVars().CDCWriteSource,
SQLMode: sctx.GetSessionVars().SQLMode,
}

// TODO(joechenrh): remove this later.
job.UpdateRawArgs = true
return job, nil
}

Expand Down
Loading