diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index 4263e9ca27a39..765ede725fb98 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -262,6 +262,7 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool) r.wg.Wait() // remove the checkpoint lock r.checkpointStorage.deleteLock(ctx) + r.checkpointStorage.close() } // Send the checksum to the flush goroutine, and reset the CheckpointRunner's checksum diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index 2a3ecdd1a4a41..c6756f8058c5c 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -326,6 +326,8 @@ func TestCheckpointRestoreRunner(t *testing.T) { checkpointRunner.WaitForFinish(ctx, true) + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) respCount := 0 checker := func(tableID int64, resp checkpoint.RestoreValueType) { require.NotNil(t, resp) @@ -395,6 +397,8 @@ func TestCheckpointRunnerRetry(t *testing.T) { err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3) require.NoError(t, err) checkpointRunner.WaitForFinish(ctx, true) + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) recordSet := make(map[string]int) _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), func(tableID int64, rangeKey checkpoint.RestoreValueType) { @@ -433,6 +437,8 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { require.NoError(t, err) time.Sleep(time.Second) checkpointRunner.WaitForFinish(ctx, true) + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) recordSet := make(map[string]int) _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), func(tableID int64, rangeKey checkpoint.RestoreValueType) { @@ -501,6 +507,8 @@ func TestCheckpointLogRestoreRunner(t *testing.T) { checkpointRunner.WaitForFinish(ctx, true) + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) respCount := 0 checker := func(metaKey string, resp checkpoint.LogRestoreValueMarshaled) { require.NotNil(t, resp) diff --git a/br/pkg/checkpoint/external_storage.go b/br/pkg/checkpoint/external_storage.go index d6d8e30c67996..078f2f1294e91 100644 --- a/br/pkg/checkpoint/external_storage.go +++ b/br/pkg/checkpoint/external_storage.go @@ -57,6 +57,8 @@ func newExternalCheckpointStorage( return checkpointStorage, nil } +func (s *externalCheckpointStorage) close() {} + func (s *externalCheckpointStorage) flushCheckpointData(ctx context.Context, data []byte) error { fname := fmt.Sprintf("%s/%x.cpt", s.CheckpointDataDir, uuid.New()) return s.storage.WriteFile(ctx, fname, data) diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index e2bc3d469eb74..b2ae3c398a3c8 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -120,6 +120,7 @@ func StartCheckpointLogRestoreRunnerForTest( return runner, nil } +// Notice that the session is owned by the checkpoint runner, and it will be also closed by it. func StartCheckpointRunnerForLogRestore( ctx context.Context, se glue.Session, diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index 5fc348c38daf0..88ff6f8f204de 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -56,6 +56,7 @@ func StartCheckpointRestoreRunnerForTest( return runner, nil } +// Notice that the session is owned by the checkpoint runner, and it will be also closed by it. func StartCheckpointRunnerForRestore( ctx context.Context, se glue.Session, diff --git a/br/pkg/checkpoint/storage.go b/br/pkg/checkpoint/storage.go index c46d955209306..465924f8300f4 100644 --- a/br/pkg/checkpoint/storage.go +++ b/br/pkg/checkpoint/storage.go @@ -39,6 +39,8 @@ type checkpointStorage interface { initialLock(ctx context.Context) error updateLock(ctx context.Context) error deleteLock(ctx context.Context) + + close() } // Notice that: @@ -124,6 +126,12 @@ type tableCheckpointStorage struct { checkpointDBName string } +func (s *tableCheckpointStorage) close() { + if s.se != nil { + s.se.Close() + } +} + func (s *tableCheckpointStorage) initialLock(ctx context.Context) error { log.Fatal("unimplement!") return nil diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 888540bed1809..d208b58bb15d2 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -214,8 +214,12 @@ func (rc *LogClient) CleanUpKVFiles( return rc.fileImporter.ClearFiles(ctx, rc.pdClient, "v1") } -func (rc *LogClient) StartCheckpointRunnerForLogRestore(ctx context.Context) (*checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType], error) { - runner, err := checkpoint.StartCheckpointRunnerForLogRestore(ctx, rc.se) +func (rc *LogClient) StartCheckpointRunnerForLogRestore(ctx context.Context, g glue.Glue, store kv.Storage) (*checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType], error) { + se, err := g.CreateSession(store) + if err != nil { + return nil, errors.Trace(err) + } + runner, err := checkpoint.StartCheckpointRunnerForLogRestore(ctx, se) return runner, errors.Trace(err) } diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 6284aed0f2d29..d0f68157ff410 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -45,7 +45,6 @@ import ( tidalloc "github.com/pingcap/tidb/br/pkg/restore/internal/prealloc_table_id" "github.com/pingcap/tidb/br/pkg/restore/split" restoreutils "github.com/pingcap/tidb/br/pkg/restore/utils" - "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/summary" "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/br/pkg/version" @@ -300,7 +299,7 @@ func (rc *SnapClient) AllocTableIDs(ctx context.Context, tables []*metautil.Tabl // storage. func (rc *SnapClient) InitCheckpoint( ctx context.Context, - s storage.ExternalStorage, + g glue.Glue, store kv.Storage, config *pdutil.ClusterConfig, checkpointFirstRun bool, ) (checkpointSetWithTableID map[int64]map[string]struct{}, checkpointClusterConfig *pdutil.ClusterConfig, err error) { @@ -378,7 +377,11 @@ func (rc *SnapClient) InitCheckpoint( } } - rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, rc.db.Session()) + se, err := g.CreateSession(store) + if err != nil { + return checkpointSetWithTableID, nil, errors.Trace(err) + } + rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se) return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 21f7686f1255c..5bee261ceff21 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -905,7 +905,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s // reload or register the checkpoint var checkpointSetWithTableID map[int64]map[string]struct{} if cfg.UseCheckpoint { - sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint(ctx, s, schedulersConfig, checkpointFirstRun) + sets, restoreSchedulersConfigFromCheckpoint, err := client.InitCheckpoint(ctx, g, mgr.GetStorage(), schedulersConfig, checkpointFirstRun) if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index c6935fcaaf544..6009eb47a6c70 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1350,7 +1350,7 @@ func restoreStream( } oldRatio = oldRatioFromCheckpoint - checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx) + checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, g, mgr.GetStorage()) if err != nil { return errors.Trace(err) }