Skip to content

Commit

Permalink
meta: separate reader and mutator (#56376)
Browse files Browse the repository at this point in the history
ref #54436
  • Loading branch information
D3Hunter authored Sep 27, 2024
1 parent c56694c commit b427e33
Show file tree
Hide file tree
Showing 86 changed files with 652 additions and 623 deletions.
41 changes: 5 additions & 36 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func BuildBackupRangeAndInitSchema(
buildRange bool,
) ([]rtree.Range, *Schemas, []*backuppb.PlacementPolicy, error) {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)
m := meta.NewReader(snapshot)

var policies []*backuppb.PlacementPolicy
if isFullBackup {
Expand Down Expand Up @@ -821,7 +821,7 @@ func BuildBackupSchemas(
fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo),
) error {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)
m := meta.NewReader(snapshot)

dbs, err := m.ListDatabases()
if err != nil {
Expand Down Expand Up @@ -936,37 +936,6 @@ func BuildBackupSchemas(
return nil
}

// BuildFullSchema builds a full backup schemas for databases and tables.
func BuildFullSchema(storage kv.Storage, backupTS uint64, fn func(dbInfo *model.DBInfo, tableInfo *model.TableInfo)) error {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)

dbs, err := m.ListDatabases()
if err != nil {
return errors.Trace(err)
}

for _, db := range dbs {
hasTable := false
err = m.IterTables(db.ID, func(table *model.TableInfo) error {
// add table
fn(db, table)
hasTable = true
return nil
})
if err != nil {
return errors.Trace(err)
}

// backup this empty db if this schema is empty.
if !hasTable {
fn(db, nil)
}
}

return nil
}

func skipUnsupportedDDLJob(job *model.Job) bool {
switch job.Type {
// TiDB V5.3.0 supports TableAttributes and TablePartitionAttributes.
Expand All @@ -988,9 +957,9 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
snapMeta := meta.NewReader(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
lastSnapMeta := meta.NewSnapshotMeta(lastSnapshot)
lastSnapMeta := meta.NewReader(lastSnapshot)
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersionWithNonEmptyDiff()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -1033,7 +1002,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S
return appendJobs, false
}

newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
newestMeta := meta.NewReader(store.GetSnapshot(kv.NewVersion(version.Ver)))
var allJobs []*model.Job
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(context.Background(), se.GetSessionCtx())
Expand Down
10 changes: 5 additions & 5 deletions br/pkg/restore/ingestrec/ingest_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ func hasOneItem(idxID int64, columnList string, columnArgs []any) (iterateFunc,
}, &count
}

func createMeta(t *testing.T, store kv.Storage, fn func(m *meta.Meta)) {
func createMeta(t *testing.T, store kv.Storage, fn func(m *meta.Mutator)) {
txn, err := store.Begin()
require.NoError(t, err)

fn(meta.NewMeta(txn))
fn(meta.NewMutator(txn))

err = txn.Commit(context.Background())
require.NoError(t, err)
Expand All @@ -117,7 +117,7 @@ func TestAddIngestRecorder(t *testing.T) {
require.NoError(t, store.Close())
}()

createMeta(t, store, func(m *meta.Meta) {
createMeta(t, store, func(m *meta.Mutator) {
dbInfo := &model.DBInfo{
ID: 1,
Name: pmodel.NewCIStr(SchemaName),
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestIndexesKind(t *testing.T) {
require.NoError(t, err)
_, err := se.ExecuteInternal(ctx)
*/
createMeta(t, store, func(m *meta.Meta) {
createMeta(t, store, func(m *meta.Mutator) {
dbInfo := &model.DBInfo{
ID: 1,
Name: pmodel.NewCIStr(SchemaName),
Expand Down Expand Up @@ -409,7 +409,7 @@ func TestRewriteTableID(t *testing.T) {
require.NoError(t, store.Close())
}()

createMeta(t, store, func(m *meta.Meta) {
createMeta(t, store, func(m *meta.Mutator) {
dbInfo := &model.DBInfo{
ID: 1,
Name: pmodel.NewCIStr(SchemaName),
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/internal/prealloc_db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func cloneTableInfos(
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
tableInfos = make([]*metautil.Table, 0, len(originTableInfos))
err := kv.RunInNewTxn(ctx, dom.Store(), true, func(_ context.Context, txn kv.Transaction) error {
allocater := meta.NewMeta(txn)
allocater := meta.NewMutator(txn)
id, e := allocater.GetGlobalID()
if e != nil {
return e
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,7 @@ func (rc *LogClient) GenGlobalID(ctx context.Context) (int64, error) {
true,
func(ctx context.Context, txn kv.Transaction) error {
var e error
t := meta.NewMeta(txn)
t := meta.NewMutator(txn)
id, e = t.GenGlobalID()
return e
})
Expand All @@ -1187,7 +1187,7 @@ func (rc *LogClient) GenGlobalIDs(ctx context.Context, n int) ([]int64, error) {
true,
func(ctx context.Context, txn kv.Transaction) error {
var e error
t := meta.NewMeta(txn)
t := meta.NewMutator(txn)
ids, e = t.GenGlobalIDs(n)
return e
})
Expand All @@ -1206,7 +1206,7 @@ func (rc *LogClient) UpdateSchemaVersion(ctx context.Context) error {
storage,
true,
func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
t := meta.NewMutator(txn)
var e error
// To trigger full-reload instead of diff-reload, we need to increase the schema version
// by at least `domain.LoadSchemaDiffVersionGapThreshold`.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const maxUserTablesNum = 10
// AssertUserDBsEmpty check whether user dbs exist in the cluster
func AssertUserDBsEmpty(dom *domain.Domain) error {
databases := dom.InfoSchema().AllSchemas()
m := meta.NewSnapshotMeta(dom.Store().GetSnapshot(kv.MaxVersion))
m := meta.NewReader(dom.Store().GetSnapshot(kv.MaxVersion))
userTables := make([]string, 0, maxUserTablesNum+1)
appendTables := func(dbName, tableName string) bool {
if len(userTables) >= maxUserTablesNum {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl
preallocedTableIDs := tidalloc.New(tables)
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
err := kv.RunInNewTxn(ctx, rc.GetDomain().Store(), true, func(_ context.Context, txn kv.Transaction) error {
return preallocedTableIDs.Alloc(meta.NewMeta(txn))
return preallocedTableIDs.Alloc(meta.NewMutator(txn))
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func buildObserveTableRanges(
backupTS uint64,
) ([]kv.KeyRange, error) {
snapshot := storage.GetSnapshot(kv.NewVersion(backupTS))
m := meta.NewSnapshotMeta(snapshot)
m := meta.NewReader(snapshot)

dbs, err := m.ListDatabases()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newTableRestore(t *testing.T,

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnLightning)
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
m := meta.NewMutator(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil && !errors.ErrorEqual(err, meta.ErrDBExists) {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (alloc *autoIDValue) alloc4Unsigned(ctx context.Context, store kv.Storage,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -156,7 +156,7 @@ func (alloc *autoIDValue) alloc4Signed(ctx context.Context,

ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
var err1 error
newBase, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -222,7 +222,7 @@ func (alloc *autoIDValue) rebase4Unsigned(ctx context.Context,
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -264,7 +264,7 @@ func (alloc *autoIDValue) rebase4Signed(ctx context.Context, store kv.Storage, d
startTime := time.Now()
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down Expand Up @@ -483,7 +483,7 @@ func (s *Service) allocAutoID(ctx context.Context, req *autoid.AutoIDRequest) (*
var currentEnd int64
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
err := kv.RunInNewTxn(ctx, s.store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(req.DbID, req.TblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(req.DbID, req.TblID).IncrementID(model.TableInfoVersion5)
var err1 error
currentEnd, err1 = idAcc.Get()
if err1 != nil {
Expand Down Expand Up @@ -523,7 +523,7 @@ func (alloc *autoIDValue) forceRebase(ctx context.Context, store kv.Storage, dbI
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta)
var oldValue int64
err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error {
idAcc := meta.NewMeta(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
idAcc := meta.NewMutator(txn).GetAutoIDAccessors(dbID, tblID).IncrementID(model.TableInfoVersion5)
currentEnd, err1 := idAcc.Get()
if err1 != nil {
return err1
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import (
"go.uber.org/zap"
)

func onAddColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) {
func onAddColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
// Handle the rolling back job.
if job.IsRollingback() {
ver, err = onDropColumn(jobCtx, t, job)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (sch *LitBackfillScheduler) Close() {

func getTblInfo(ctx context.Context, d *ddl, job *model.Job) (tblInfo *model.TableInfo, err error) {
err = kv.RunInNewTxn(ctx, d.store, true, func(_ context.Context, txn kv.Transaction) error {
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
tblInfo, err = meta.NewMutator(txn).GetTable(job.SchemaID, job.TableID)
return err
})
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions pkg/ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func isFlashbackSupportedDDLAction(action model.ActionType) bool {
}
}

func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string) error {
func checkSystemSchemaID(t meta.Reader, schemaID int64, flashbackTSString string) error {
if schemaID <= 0 {
return nil
}
Expand All @@ -220,7 +220,7 @@ func checkSystemSchemaID(t *meta.Meta, schemaID int64, flashbackTSString string)
return nil
}

func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, store kv.Storage, t *meta.Meta, job *model.Job, flashbackTS uint64) (err error) {
func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context, store kv.Storage, t *meta.Mutator, job *model.Job, flashbackTS uint64) (err error) {
if err = ValidateFlashbackTS(ctx, se, flashbackTS); err != nil {
return err
}
Expand All @@ -246,7 +246,7 @@ func checkAndSetFlashbackClusterInfo(ctx context.Context, se sessionctx.Context,
return errors.Trace(err)
}

flashbackSnapshotMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSnapshotMeta := meta.NewReader(store.GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSchemaVersion, err := flashbackSnapshotMeta.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -402,7 +402,7 @@ func getFlashbackKeyRanges(ctx context.Context, sess sessionctx.Context, flashba
keyRanges := make([]kv.KeyRange, 0)

// get snapshot schema IDs.
flashbackSnapshotMeta := meta.NewSnapshotMeta(sess.GetStore().GetSnapshot(kv.NewVersion(flashbackTS)))
flashbackSnapshotMeta := meta.NewReader(sess.GetStore().GetSnapshot(kv.NewVersion(flashbackTS)))
snapshotSchemas, err := flashbackSnapshotMeta.ListDatabases()
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -700,7 +700,7 @@ func splitRegionsByKeyRanges(ctx context.Context, store kv.Storage, keyRanges []
// 2. before flashback start, check timestamp, disable GC and close PD schedule, get flashback key ranges.
// 3. phase 1, lock flashback key ranges.
// 4. phase 2, send flashback RPC, do flashback jobs.
func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) {
func (w *worker) onFlashbackCluster(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, err error) {
inFlashbackTest := false
failpoint.Inject("mockFlashbackTest", func(val failpoint.Value) {
if val.(bool) {
Expand Down
14 changes: 7 additions & 7 deletions pkg/ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func InitAndAddColumnToTable(tblInfo *model.TableInfo, colInfo *model.ColumnInfo
return colInfo
}

func checkAddColumn(t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo,
func checkAddColumn(t *meta.Mutator, job *model.Job) (*model.TableInfo, *model.ColumnInfo, *model.ColumnInfo,
*ast.ColumnPosition, bool /* ifNotExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
Expand Down Expand Up @@ -138,7 +138,7 @@ func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) {
return nil
}

func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func onDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
tblInfo, colInfo, idxInfos, ifExists, err := checkDropColumn(jobCtx, t, job)
if err != nil {
if ifExists && dbterror.ErrCantDropFieldOrKey.Equal(err) {
Expand Down Expand Up @@ -234,7 +234,7 @@ func onDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64,
return ver, errors.Trace(err)
}

func checkDropColumn(jobCtx *jobContext, t *meta.Meta, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
func checkDropColumn(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (*model.TableInfo, *model.ColumnInfo, []*model.IndexInfo, bool /* ifExists */, error) {
schemaID := job.SchemaID
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, schemaID)
if err != nil {
Expand Down Expand Up @@ -291,7 +291,7 @@ func isDroppableColumn(tblInfo *model.TableInfo, colName pmodel.CIStr) error {
return nil
}

func onSetDefaultValue(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, _ error) {
func onSetDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job) (ver int64, _ error) {
newCol := &model.ColumnInfo{}
err := job.DecodeArgs(newCol)
if err != nil {
Expand Down Expand Up @@ -896,7 +896,7 @@ func updateChangingObjState(changingCol *model.ColumnInfo, changingIdxs []*model
}
}

func checkAndApplyAutoRandomBits(jobCtx *jobContext, m *meta.Meta, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
func checkAndApplyAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
oldCol *model.ColumnInfo, newCol *model.ColumnInfo, newAutoRandBits uint64) error {
if newAutoRandBits == 0 {
return nil
Expand Down Expand Up @@ -967,7 +967,7 @@ func (r *asAutoIDRequirement) AutoIDClient() *autoid.ClientDiscover {

// applyNewAutoRandomBits set auto_random bits to TableInfo and
// migrate auto_increment ID to auto_random ID if possible.
func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Meta, dbInfo *model.DBInfo,
func applyNewAutoRandomBits(jobCtx *jobContext, m *meta.Mutator, dbInfo *model.DBInfo,
tblInfo *model.TableInfo, oldCol *model.ColumnInfo, newAutoRandBits uint64) error {
tblInfo.AutoRandomBits = newAutoRandBits
needMigrateFromAutoIncToAutoRand := mysql.HasAutoIncrementFlag(oldCol.GetFlag())
Expand Down Expand Up @@ -1037,7 +1037,7 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
return nil
}

func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Meta, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
func updateColumnDefaultValue(jobCtx *jobContext, t *meta.Mutator, job *model.Job, newCol *model.ColumnInfo, oldColName *pmodel.CIStr) (ver int64, _ error) {
tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestModifyAutoRandColumnWithMetaKeyChanged(t *testing.T) {
atomic.AddInt32(&errCount, -1)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBackfillDDLPrefix+ddl.DDLBackfillers[model.ActionModifyColumn])
genAutoRandErr = kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
t := meta.NewMutator(txn)
_, err1 := t.GetAutoIDAccessors(dbID, tID).RandomID().Inc(1)
return err1
})
Expand All @@ -166,7 +166,7 @@ func TestModifyAutoRandColumnWithMetaKeyChanged(t *testing.T) {
var newTbInfo *model.TableInfo
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
err := kv.RunInNewTxn(ctx, store, false, func(ctx context.Context, txn kv.Transaction) error {
t := meta.NewMeta(txn)
t := meta.NewMutator(txn)
var err error
newTbInfo, err = t.GetTable(dbID, tID)
if err != nil {
Expand Down Expand Up @@ -361,7 +361,7 @@ type historyJobArgs struct {
func getSchemaVer(t *testing.T, ctx sessionctx.Context) int64 {
txn, err := newTxn(ctx)
require.NoError(t, err)
m := meta.NewMeta(txn)
m := meta.NewMutator(txn)
ver, err := m.GetSchemaVersion()
require.NoError(t, err)
return ver
Expand Down
Loading

0 comments on commit b427e33

Please sign in to comment.