Skip to content

Commit

Permalink
ddl/ingest: refactor checkpoint manager (#54747)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
tangenta authored Jul 24, 2024
1 parent d6ee4b8 commit e6e8f7f
Show file tree
Hide file tree
Showing 17 changed files with 231 additions and 206 deletions.
31 changes: 23 additions & 8 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/logutil"
sess "github.com/pingcap/tidb/pkg/ddl/session"
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/disttask/operator"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/kv"
Expand Down Expand Up @@ -689,6 +690,7 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
cpMgr, err := ingest.NewCheckpointManager(
ctx,
sessPool,
reorgInfo.PhysicalTableID,
job.ID,
indexIDs,
ingest.LitBackCtxMgr.EncodeJobSortPath(job.ID),
Expand All @@ -700,7 +702,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Error(err))
} else {
defer cpMgr.Close()
cpMgr.Reset(t.GetPhysicalID(), reorgInfo.StartKey, reorgInfo.EndKey)
bcCtx.AttachCheckpointManager(cpMgr)
}

Expand All @@ -723,10 +724,6 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
zap.Int64s("index IDs", indexIDs))
return errors.Trace(err)
}
// in happy path FinishAndUnregisterEngines will be called in pipe.Close. We can
// ignore the error here.
//nolint: errcheck
defer bcCtx.FinishAndUnregisterEngines()

pipe, err := NewAddIndexIngestPipeline(
opCtx,
Expand All @@ -748,13 +745,31 @@ func (dc *ddlCtx) runAddIndexInLocalIngestMode(
if err != nil {
return err
}
err = pipe.Execute()
err = executeAndClosePipeline(opCtx, pipe)
if err != nil {
err1 := bcCtx.FinishAndUnregisterEngines(ingest.OptCloseEngines)
if err1 != nil {
logutil.DDLIngestLogger().Error("unregister engine failed",
zap.Int64("jobID", job.ID),
zap.Error(err1),
zap.Int64s("index IDs", indexIDs))
}
return err
}
if cpMgr != nil {
cpMgr.AdvanceWatermark(true, true)
}
return bcCtx.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func executeAndClosePipeline(ctx *OperatorCtx, pipe *operator.AsyncPipeline) error {
err := pipe.Execute()
if err != nil {
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
if opErr := ctx.OperatorErr(); opErr != nil {
return opErr
}
return err
}
Expand Down
43 changes: 23 additions & 20 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ func (src *TableScanTaskSource) Open() error {

// adjustStartKey adjusts the start key so that we can skip the ranges that have been processed
// according to the information of checkpoint manager.
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) (adjusted kv.Key, done bool) {
if src.cpMgr == nil {
return start
return start, false
}
cpKey := src.cpMgr.LastProcessedKey()
if len(cpKey) == 0 {
return start
return start, false
}
if cpKey.Cmp(start) < 0 || cpKey.Cmp(end) > 0 {
logutil.Logger(src.ctx).Error("invalid checkpoint key",
Expand All @@ -357,16 +357,23 @@ func (src *TableScanTaskSource) adjustStartKey(start, end kv.Key) kv.Key {
if intest.InTest {
panic("invalid checkpoint key")
}
return start
return start, false
}
return cpKey.Next()
if cpKey.Cmp(end) == 0 {
return cpKey, true
}
return cpKey.Next(), false
}

func (src *TableScanTaskSource) generateTasks() error {
taskIDAlloc := newTaskIDAllocator()
defer src.sink.Finish()

startKey := src.adjustStartKey(src.startKey, src.endKey)
startKey, done := src.adjustStartKey(src.startKey, src.endKey)
if done {
// All table data are done.
return nil
}
for {
kvRanges, err := loadTableRanges(
src.ctx,
Expand Down Expand Up @@ -931,27 +938,23 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Return(errors.New("mock flush error"))
})
flushed, imported, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
if s.cpMgr != nil {
// Try to advance watermark even if there is an error.
s.cpMgr.AdvanceWatermark(flushed, imported)
}
if err != nil {
msg := "flush error"
if flushed {
msg = "import error"
}
logutil.Logger(s.ctx).Error(msg, zap.String("category", "ddl"), zap.Error(err))
return err
}
return nil
}

func (s *indexWriteResultSink) Close() error {
err := s.errGroup.Wait()
// for local pipeline
if bc := s.backendCtx; bc != nil {
err2 := bc.FinishAndUnregisterEngines()
if err == nil {
err = err2
}
}
return err
return s.errGroup.Wait()
}

func (*indexWriteResultSink) String() string {
Expand Down
26 changes: 15 additions & 11 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,25 +109,29 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
defer opCtx.Cancel()
r.curRowCount.Store(0)

var pipe *operator.AsyncPipeline
if len(r.cloudStorageURI) > 0 {
pipe, err = r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
} else {
pipe, err = r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
pipe, err := r.buildExternalStorePipeline(opCtx, subtask.ID, sm, subtask.Concurrency)
if err != nil {
return err
}
return executeAndClosePipeline(opCtx, pipe)
}

pipe, err := r.buildLocalStorePipeline(opCtx, sm, subtask.Concurrency)
if err != nil {
return err
}

err = pipe.Execute()
err = executeAndClosePipeline(opCtx, pipe)
if err != nil {
// For dist task local based ingest, checkpoint is unsupported.
// If there is an error we should keep local sort dir clean.
err1 := r.bc.FinishAndUnregisterEngines(ingest.OptCleanData)
if err1 != nil {
logutil.DDLLogger().Warn("read index executor unregister engine failed", zap.Error(err1))
}
return err
}
err = pipe.Close()
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
return err
return r.bc.FinishAndUnregisterEngines(ingest.OptCleanData | ingest.OptCheckDup)
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
Expand Down
29 changes: 2 additions & 27 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,28 +960,6 @@ func runIngestReorgJobDist(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,

func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
tbl table.Table, allIndexInfos []*model.IndexInfo) (done bool, ver int64, err error) {
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if ok && bc.Done() {
return true, 0, nil
}
ctx := tidblogutil.WithCategory(w.ctx, "ddl-ingest")
var discovery pd.ServiceDiscovery
if d != nil {
//nolint:forcetypeassert
discovery = d.store.(tikv.Storage).GetRegionCache().PDClient().GetServiceDiscovery()
}
hasUnique := false
for _, indexInfo := range allIndexInfos {
if indexInfo.Unique {
hasUnique = true
break
}
}
bc, err = ingest.LitBackCtxMgr.Register(ctx, job.ID, hasUnique, nil, discovery, job.ReorgMeta.ResourceGroupName)
if err != nil {
ver, err = convertAddIdxJob2RollbackJob(d, t, job, tbl.Meta(), allIndexInfos, err)
return false, ver, errors.Trace(err)
}
done, ver, err = runReorgJobAndHandleErr(w, d, t, job, tbl, allIndexInfos, false)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
Expand All @@ -996,11 +974,8 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
}
return false, ver, errors.Trace(err)
}
if !done {
return false, ver, nil
}
bc.SetDone()
return true, ver, nil
failpoint.InjectCall("afterRunIngestReorgJob", job, done)
return done, ver, nil
}

func errorIsRetryable(err error, job *model.Job) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ go_test(
embed = [":ingest"],
flaky = True,
race = "on",
shard_count = 18,
shard_count = 20,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
16 changes: 1 addition & 15 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,9 @@ type BackendCtx interface {
// are Register-ed before. It's safe to call it multiple times.
//
// FinishAndUnregisterEngines is only used in local disk based ingest.
FinishAndUnregisterEngines() error
FinishAndUnregisterEngines(opt UnregisterOpt) error

FlushController
Done() bool
SetDone()

AttachCheckpointManager(*CheckpointManager)
GetCheckpointManager() *CheckpointManager
Expand Down Expand Up @@ -99,7 +97,6 @@ type litBackendCtx struct {
ctx context.Context
cfg *lightning.Config
sysVars map[string]string
done bool

flushing atomic.Bool
timeOfLastFlush atomicutil.Time
Expand Down Expand Up @@ -300,7 +297,6 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID))
}
ei.openedEngine = nil
ei.closedEngine = nil
return err
}
return nil
Expand Down Expand Up @@ -331,16 +327,6 @@ func (bc *litBackendCtx) checkFlush(mode FlushMode) (shouldFlush bool, shouldImp
return shouldFlush, shouldImport
}

// Done returns true if the lightning backfill is done.
func (bc *litBackendCtx) Done() bool {
return bc.done
}

// SetDone sets the done flag.
func (bc *litBackendCtx) SetDone() {
bc.done = true
}

// AttachCheckpointManager attaches a checkpoint manager to the backend context.
func (bc *litBackendCtx) AttachCheckpointManager(mgr *CheckpointManager) {
bc.checkpointMgr = mgr
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/ingest/backend_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) {
if !exist {
return
}
_ = bc.FinishAndUnregisterEngines()
_ = bc.FinishAndUnregisterEngines(OptCloseEngines)
bc.backend.Close()
m.memRoot.Release(structSizeBackendCtx)
m.memRoot.ReleaseWithTag(encodeBackendTag(jobID))
Expand Down
Loading

0 comments on commit e6e8f7f

Please sign in to comment.