Skip to content

Commit

Permalink
br: use a unique session for checkpoint runner (#56487)
Browse files Browse the repository at this point in the history
close #56488
  • Loading branch information
Leavrth authored Oct 9, 2024
1 parent df821b9 commit 6e84244
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 7 deletions.
1 change: 1 addition & 0 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
r.wg.Wait()
// remove the checkpoint lock
r.checkpointStorage.deleteLock(ctx)
r.checkpointStorage.close()
}

// Send the checksum to the flush goroutine, and reset the CheckpointRunner's checksum
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func TestCheckpointRestoreRunner(t *testing.T) {

checkpointRunner.WaitForFinish(ctx, true)

se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
respCount := 0
checker := func(tableID int64, resp checkpoint.RestoreValueType) {
require.NotNil(t, resp)
Expand Down Expand Up @@ -395,6 +397,8 @@ func TestCheckpointRunnerRetry(t *testing.T) {
err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3)
require.NoError(t, err)
checkpointRunner.WaitForFinish(ctx, true)
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
Expand Down Expand Up @@ -433,6 +437,8 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
require.NoError(t, err)
time.Sleep(time.Second)
checkpointRunner.WaitForFinish(ctx, true)
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
Expand Down Expand Up @@ -501,6 +507,8 @@ func TestCheckpointLogRestoreRunner(t *testing.T) {

checkpointRunner.WaitForFinish(ctx, true)

se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
respCount := 0
checker := func(metaKey string, resp checkpoint.LogRestoreValueMarshaled) {
require.NotNil(t, resp)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func newExternalCheckpointStorage(
return checkpointStorage, nil
}

func (s *externalCheckpointStorage) close() {}

func (s *externalCheckpointStorage) flushCheckpointData(ctx context.Context, data []byte) error {
fname := fmt.Sprintf("%s/%x.cpt", s.CheckpointDataDir, uuid.New())
return s.storage.WriteFile(ctx, fname, data)
Expand Down
1 change: 1 addition & 0 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func StartCheckpointLogRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
Expand Down
1 change: 1 addition & 0 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func StartCheckpointRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore(
ctx context.Context,
se glue.Session,
Expand Down
8 changes: 8 additions & 0 deletions br/pkg/checkpoint/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type checkpointStorage interface {
initialLock(ctx context.Context) error
updateLock(ctx context.Context) error
deleteLock(ctx context.Context)

close()
}

// Notice that:
Expand Down Expand Up @@ -124,6 +126,12 @@ type tableCheckpointStorage struct {
checkpointDBName string
}

func (s *tableCheckpointStorage) close() {
if s.se != nil {
s.se.Close()
}
}

func (s *tableCheckpointStorage) initialLock(ctx context.Context) error {
log.Fatal("unimplement!")
return nil
Expand Down
8 changes: 6 additions & 2 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,12 @@ func (rc *LogClient) CleanUpKVFiles(
return rc.fileImporter.ClearFiles(ctx, rc.pdClient, "v1")
}

func (rc *LogClient) StartCheckpointRunnerForLogRestore(ctx context.Context) (*checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType], error) {
runner, err := checkpoint.StartCheckpointRunnerForLogRestore(ctx, rc.se)
func (rc *LogClient) StartCheckpointRunnerForLogRestore(ctx context.Context, g glue.Glue, store kv.Storage) (*checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType], error) {
se, err := g.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
runner, err := checkpoint.StartCheckpointRunnerForLogRestore(ctx, se)
return runner, errors.Trace(err)
}

Expand Down
9 changes: 6 additions & 3 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
tidalloc "github.com/pingcap/tidb/br/pkg/restore/internal/prealloc_table_id"
"github.com/pingcap/tidb/br/pkg/restore/split"
restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/summary"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/version"
Expand Down Expand Up @@ -300,7 +299,7 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl
// storage.
func (rc *SnapClient) InitCheckpoint(
ctx context.Context,
s storage.ExternalStorage,
g glue.Glue, store kv.Storage,
config *pdutil.ClusterConfig,
checkpointFirstRun bool,
) (checkpointSetWithTableID map[int64]map[string]struct{}, checkpointClusterConfig *pdutil.ClusterConfig, err error) {
Expand Down Expand Up @@ -378,7 +377,11 @@ func (rc *SnapClient) InitCheckpoint(
}
}

rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, rc.db.Session())
se, err := g.CreateSession(store)
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
}
rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se)
return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s
// reload or register the checkpoint
var checkpointSetWithTableID map[int64]map[string]struct{}
if cfg.UseCheckpoint {
sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint(ctx, s, schedulersConfig, checkpointFirstRun)
sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint(ctx, g, mgr.GetStorage(), schedulersConfig, checkpointFirstRun)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ func restoreStream(
}
oldRatio = oldRatioFromCheckpoint

checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx)
checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
Expand Down

0 comments on commit 6e84244

Please sign in to comment.