Skip to content

Commit

Permalink
[Disk Manager] move snapshot incremental logic to dataplane (#1843)
Browse files Browse the repository at this point in the history
* move snapshot incremental logic to dataplane

* fix issues

* fix issues

* fix issues

* fix issue
  • Loading branch information
BarkovBG authored Sep 4, 2024
1 parent 2b88e32 commit 38cbf0d
Show file tree
Hide file tree
Showing 20 changed files with 676 additions and 690 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -62,7 +63,29 @@ func (t *createSnapshotFromDiskTask) Cancel(
return err
}

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err = t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
if err != nil {
return err
}

// NBS-3192.
if len(t.state.BaseSnapshotId) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
t.state.BaseSnapshotId,
execCtx.GetTaskID(),
)
if err != nil {
return err
}
logging.Info(
ctx,
"Unlocked snapshot with id %v",
t.state.BaseSnapshotId,
)
}

return nil
}

func (t *createSnapshotFromDiskTask) GetMetadata(
Expand Down Expand Up @@ -101,6 +124,108 @@ func (t *createSnapshotFromDiskTask) saveProgress(
return execCtx.SaveState(ctx)
}

func (t *createSnapshotFromDiskTask) lockBaseSnapshot(
ctx context.Context,
execCtx tasks.ExecutionContext,
snapshotMeta storage.SnapshotMeta,
) (string, string, error) {

nbsClient, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return "", "", err
}

diskParams, err := nbsClient.Describe(ctx, t.request.SrcDisk.DiskId)
if err != nil {
return "", "", err
}

baseSnapshotID := snapshotMeta.BaseSnapshotID
baseCheckpointID := snapshotMeta.BaseCheckpointID

if diskParams.IsDiskRegistryBasedDisk {
logging.Info(
ctx,
"Performing full snapshot %v of disk %v because it is registry based",
snapshotMeta.ID,
t.request.SrcDisk.DiskId,
)
// TODO: enable incremental snapshots for such disks.
baseSnapshotID = ""
baseCheckpointID = ""
}

if len(baseSnapshotID) != 0 {
// Lock base snapshot to prevent deletion.
locked, err := t.storage.LockSnapshot(
ctx,
baseSnapshotID,
execCtx.GetTaskID(),
)
if err != nil {
return "", "", err
}

if locked {
logging.Info(
ctx,
"Locked snapshot with id %v",
baseSnapshotID,
)
} else {
logging.Info(
ctx,
"Snapshot with id %v can't be locked",
baseSnapshotID,
)
logging.Info(
ctx,
"Performing full snapshot %v of disk %v",
snapshotMeta.ID,
t.request.SrcDisk.DiskId,
)
baseSnapshotID = ""
baseCheckpointID = ""
}
}

return baseSnapshotID, baseCheckpointID, nil
}

func (t *createSnapshotFromDiskTask) unlockBaseSnapshot(
ctx context.Context,
execCtx tasks.ExecutionContext,
client nbs_client.Client,
) error {

if len(t.state.BaseSnapshotId) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
t.state.BaseSnapshotId,
execCtx.GetTaskID(),
)
if err != nil {
return err
}
logging.Info(
ctx,
"Unlocked snapshot with id %v",
t.state.BaseSnapshotId,
)

err = client.DeleteCheckpoint(
ctx,
t.request.SrcDisk.DiskId,
t.state.BaseCheckpointId,
)
if err != nil {
return err
}
}

return nil
}

func (t *createSnapshotFromDiskTask) run(
ctx context.Context,
execCtx tasks.ExecutionContext,
Expand All @@ -111,10 +236,10 @@ func (t *createSnapshotFromDiskTask) run(
snapshotMeta, err := t.storage.CreateSnapshot(
ctx,
storage.SnapshotMeta{
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
BaseSnapshotID: t.request.BaseSnapshotId,
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
CreateTaskID: execCtx.GetTaskID(),
},
)
if err != nil {
Expand All @@ -126,6 +251,13 @@ func (t *createSnapshotFromDiskTask) run(
return nil
}

baseSnapshotID, baseCheckpointID, err := t.lockBaseSnapshot(ctx, execCtx, *snapshotMeta)
if err != nil {
return err
}
t.state.BaseSnapshotId = baseSnapshotID
t.state.BaseCheckpointId = baseCheckpointID

client, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return err
Expand All @@ -145,14 +277,14 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

incremental := len(t.request.BaseSnapshotId) != 0
incremental := len(t.state.BaseSnapshotId) != 0

source, err := nbs.NewDiskSource(
ctx,
client,
t.request.SrcDisk.DiskId,
proxyOverlayDiskID,
t.request.SrcDiskBaseCheckpointId,
t.state.BaseCheckpointId,
t.request.SrcDiskCheckpointId,
diskParams.EncryptionDesc,
chunkSize,
Expand Down Expand Up @@ -200,7 +332,7 @@ func (t *createSnapshotFromDiskTask) run(

if incremental {
shallowSource := snapshot.NewSnapshotShallowSource(
t.request.BaseSnapshotId,
t.state.BaseSnapshotId,
t.request.DstSnapshotId,
t.storage,
)
Expand All @@ -221,7 +353,7 @@ func (t *createSnapshotFromDiskTask) run(
if incremental {
_, err := t.storage.CheckSnapshotReady(
ctx,
t.request.BaseSnapshotId,
t.state.BaseSnapshotId,
)
if err != nil {
return err
Expand Down Expand Up @@ -267,6 +399,11 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

err = t.unlockBaseSnapshot(ctx, execCtx, client)
if err != nil {
return err
}

return t.storage.SnapshotCreated(
ctx,
t.request.DstSnapshotId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (t *createSnapshotFromLegacySnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromLegacySnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (t *createSnapshotFromSnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromSnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (t *createSnapshotFromURLTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromURLTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -39,7 +40,28 @@ func (t *deleteSnapshotTask) deletingSnapshot(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.SnapshotId)
snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.SnapshotId, execCtx.GetTaskID())
if err != nil {
return err
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
snapshotMeta.CreateTaskID,
)
if err != nil {
return err
}
logging.Info(
ctx,
"Unlocked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
}

return nil
}

func (t *deleteSnapshotTask) Run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ option go_package = "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg

message CreateSnapshotFromDiskRequest {
reserved 3;
reserved 5;
reserved 6;

types.Disk SrcDisk = 1;
string SrcDiskBaseCheckpointId = 5;
string SrcDiskCheckpointId = 2;
string BaseSnapshotId = 6;
string DstSnapshotId = 4;
bool UseS3 = 7;
bool UseProxyOverlayDisk = 8;
Expand All @@ -38,6 +38,8 @@ message CreateSnapshotFromDiskTaskState {
uint32 ChunkCount = 5;
uint64 TransferredDataSize = 8;
optional bool ProxyOverlayDiskCreated = 10;
string BaseSnapshotId = 12;
string BaseCheckpointId = 13;
}

message CreateSnapshotFromDiskMetadata {
Expand Down
Loading

0 comments on commit 38cbf0d

Please sign in to comment.