Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl/ingest: refactor checkpoint manager #54747

Merged
merged 8 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
sess "github.com/pingcap/tidb/pkg/ddl/internal/session"
"github.com/pingcap/tidb/pkg/ddl/logutil"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -689,6 +690,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
cpMgr, err := ingest.NewCheckpointManager(
ctx,
sessPool,
reorgInfo.PhysicalTableID,
job.ID,
indexIDs,
ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID),
Expand All @@ -700,7 +702,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Error(err))
} else {
defer cpMgr.Close()
cpMgr.Reset(t.GetPhysicalID(), reorgInfo.StartKey, reorgInfo.EndKey)
bcCtx.AttachCheckpointManager(cpMgr)
}

Expand All @@ -723,10 +724,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
// in happy path FinishAndUnregisterEngines will be called in pipe.Close. We can
// ignore the error here.
//nolint: errcheck
defer bcCtx.FinishAndUnregisterEngines()

pipe, err := NewAddIndexIngestPipeline(
opCtx,
Expand All @@ -748,13 +745,31 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = pipe.Execute()
err = executeAndClosePipeline(opCtx, pipe)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
logutil.DDLIngestLogger().Error("unregister engine failed",
zap.Int64("jobID", job.ID),
zap.Error(err1),
zap.Int64s("index IDs", indexIDs))
}
return err
}
if cpMgr != nil {
cpMgr.AdvanceWatermark(true, true)
}
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
err := pipe.Execute()
if err != nil {
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
}
return err
}
Expand Down
43 changes: 23 additions & 20 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ func (src *TableScanTaskSource) Open() error {

// adjustStartKey adjusts the start key so that we can skip the ranges that have been processed
// according to the information of checkpoint manager.
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.Key, done bool) {
if src.cpMgr == nil {
return start
return start, false
}
cpKey := src.cpMgr.LastProcessedKey()
if len(cpKey) == 0 {
return start
return start, false
}
if cpKey.Cmp(start) < 0 || cpKey.Cmp(end) > 0 {
logutil.Logger(src.ctx).Error("invalid checkpoint key",
Expand All @@ -357,16 +357,23 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
if intest.InTest {
panic("invalid checkpoint key")
}
return start
return start, false
}
return cpKey.Next()
if cpKey.Cmp(end) == 0 {
return cpKey, true
}
return cpKey.Next(), false
}

func (src *TableScanTaskSource) generateTasks() error {
taskIDAlloc := newTaskIDAllocator()
defer src.sink.Finish()

startKey := src.adjustStartKey(src.startKey, src.endKey)
startKey, done := src.adjustStartKey(src.startKey, src.endKey)
if done {
// All table data are done.
return nil
}
for {
kvRanges, err := loadTableRanges(
src.ctx,
Expand Down Expand Up @@ -931,27 +938,23 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Return(errors.New("mock flush error"))
})
flushed, imported, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
if s.cpMgr != nil {
// Try to advance watermark even if there is an error.
s.cpMgr.AdvanceWatermark(flushed, imported)
}
if err != nil {
msg := "flush error"
if flushed {
msg = "import error"
}
logutil.Logger(s.ctx).Error(msg, zap.String("category", "ddl"), zap.Error(err))
return err
}
return nil
}

func (s *indexWriteResultSink) Close() error {
err := s.errGroup.Wait()
// for local pipeline
if bc := s.backendCtx; bc != nil {
err2 := bc.FinishAndUnregisterEngines()
if err == nil {
err = err2
}
}
return err
return s.errGroup.Wait()
}

func (*indexWriteResultSink) String() string {
Expand Down
26 changes: 15 additions & 11 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,29 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
defer opCtx.Cancel()
r.curRowCount.Store(0)

var pipe *operator.AsyncPipeline
if len(r.cloudStorageURI) > 0 {
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
} else {
pipe, err = r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
pipe, err := r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}

err = pipe.Execute()
err = executeAndClosePipeline(opCtx, pipe)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
err1 := r.bc.FinishAndUnregisterEngines(ingest.OptCleanData)
if err1 != nil {
logutil.DDLLogger().Warn("read index executor unregister engine failed", zap.Error(err1))
}
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
return err
return r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
Expand Down
29 changes: 2 additions & 27 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -954,28 +954,6 @@ func runIngestReorgJobDist(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,

func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if ok && bc.Done() {
return true, 0, nil
}
ctx := tidblogutil.WithCategory(w.ctx, "ddl-ingest")
var discovery pd.ServiceDiscovery
if d != nil {
//nolint:forcetypeassert
discovery = d.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
}
hasUnique := false
for _, indexInfo := range allIndexInfos {
if indexInfo.Unique {
hasUnique = true
break
}
}
bc, err = ingest.LitBackCtxMgr.Register(ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
return false, ver, errors.Trace(err)
}
Comment on lines -974 to -978
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be registered in runAddIndexInLocalIngestMode.

done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, allIndexInfos, false)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
Expand All @@ -990,11 +968,8 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}
return false, ver, errors.Trace(err)
}
if !done {
return false, ver, nil
}
bc.SetDone()
return true, ver, nil
failpoint.InjectCall("afterRunIngestReorgJob", job, done)
return done, ver, nil
}

func errorIsRetryable(err error, job *model.Job) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 20,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
16 changes: 1 addition & 15 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ type BackendCtx interface {
// are Register-ed before. It's safe to call it multiple times.
//
// FinishAndUnregisterEngines is only used in local disk based ingest.
FinishAndUnregisterEngines() error
FinishAndUnregisterEngines(opt UnregisterOpt) error

FlushController
Done() bool
SetDone()

AttachCheckpointManager(*CheckpointManager)
GetCheckpointManager() *CheckpointManager
Expand Down Expand Up @@ -99,7 +97,6 @@ type litBackendCtx struct {
ctx context.Context
cfg *lightning.Config
sysVars map[string]string
done bool

flushing atomic.Bool
timeOfLastFlush atomicutil.Time
Expand Down Expand Up @@ -300,7 +297,6 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
ei.closedEngine = nil
return err
}
return nil
Expand Down Expand Up @@ -331,16 +327,6 @@ func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImp
return shouldFlush, shouldImport
}

// Done returns true if the lightning backfill is done.
func (bc *litBackendCtx) Done() bool {
return bc.done
}

// SetDone sets the done flag.
func (bc *litBackendCtx) SetDone() {
bc.done = true
}

// AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
bc.checkpointMgr = mgr
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
if !exist {
return
}
_ = bc.FinishAndUnregisterEngines()
_ = bc.FinishAndUnregisterEngines(OptCloseEngines)
bc.backend.Close()
m.memRoot.Release(structSizeBackendCtx)
m.memRoot.ReleaseWithTag(encodeBackendTag(jobID))
Expand Down
Loading