Skip to content

Commit

Permalink
move snapshot incremental logic to dataplane
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Aug 30, 2024
1 parent 8d6136f commit 676eb01
Show file tree
Hide file tree
Showing 20 changed files with 653 additions and 686 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -62,7 +63,29 @@ func (t *createSnapshotFromDiskTask) Cancel(
return err
}

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err = t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
if err != nil {
return err
}

// NBS-3192.
if len(t.state.BaseSnapshotId) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
t.state.BaseSnapshotId,
execCtx.GetTaskID(),
)
if err != nil {
return err
}
logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
t.state.BaseSnapshotId,
)
}

return nil
}

func (t *createSnapshotFromDiskTask) GetMetadata(
Expand Down Expand Up @@ -101,6 +124,65 @@ func (t *createSnapshotFromDiskTask) saveProgress(
return execCtx.SaveState(ctx)
}

func (t *createSnapshotFromDiskTask) getIncremental(
ctx context.Context,
execCtx tasks.ExecutionContext,
snapshotMeta storage.SnapshotMeta,
) (string, string, error) {

nbsClient, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return "", "", err
}

diskParams, err := nbsClient.Describe(ctx, t.request.SrcDisk.DiskId)
if err != nil {
return "", "", err
}

baseSnapshotID := snapshotMeta.BaseSnapshotID
baseCheckpointID := snapshotMeta.BaseCheckpointID

if diskParams.IsDiskRegistryBasedDisk {
// Should perform full snapshot of disk.
// TODO: enable incremental snapshots for such disks.
baseSnapshotID = ""
baseCheckpointID = ""
}

if len(baseSnapshotID) != 0 {
// Lock base snapshot to prevent deletion.
locked, err := t.storage.LockSnapshot(
ctx,
baseSnapshotID,
execCtx.GetTaskID(),
)
if err != nil {
return "", "", err
}

if locked {
logging.Info(
ctx,
"Successfully locked snapshot with id %v",
baseSnapshotID,
)
} else {
logging.Info(
ctx,
"Snapshot with id %v can't be locked",
baseSnapshotID,
)

// Should perform full snapshot of disk.
baseSnapshotID = ""
baseCheckpointID = ""
}
}

return baseSnapshotID, baseCheckpointID, nil
}

func (t *createSnapshotFromDiskTask) run(
ctx context.Context,
execCtx tasks.ExecutionContext,
Expand All @@ -111,10 +193,10 @@ func (t *createSnapshotFromDiskTask) run(
snapshotMeta, err := t.storage.CreateSnapshot(
ctx,
storage.SnapshotMeta{
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
BaseSnapshotID: t.request.BaseSnapshotId,
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
CreateTaskID: execCtx.GetTaskID(),
},
)
if err != nil {
Expand All @@ -126,6 +208,13 @@ func (t *createSnapshotFromDiskTask) run(
return nil
}

baseSnapshotID, baseCheckpointID, err := t.getIncremental(ctx, execCtx, *snapshotMeta)
if err != nil {
return err
}
t.state.BaseSnapshotId = baseSnapshotID
t.state.BaseCheckpointId = baseCheckpointID

client, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return err
Expand All @@ -145,14 +234,14 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

incremental := len(t.request.BaseSnapshotId) != 0
incremental := len(t.state.BaseSnapshotId) != 0

source, err := nbs.NewDiskSource(
ctx,
client,
t.request.SrcDisk.DiskId,
proxyOverlayDiskID,
t.request.SrcDiskBaseCheckpointId,
t.state.BaseCheckpointId,
t.request.SrcDiskCheckpointId,
diskParams.EncryptionDesc,
chunkSize,
Expand Down Expand Up @@ -200,7 +289,7 @@ func (t *createSnapshotFromDiskTask) run(

if incremental {
shallowSource := snapshot.NewSnapshotShallowSource(
t.request.BaseSnapshotId,
t.state.BaseSnapshotId,
t.request.DstSnapshotId,
t.storage,
)
Expand All @@ -221,7 +310,7 @@ func (t *createSnapshotFromDiskTask) run(
if incremental {
_, err := t.storage.CheckSnapshotReady(
ctx,
t.request.BaseSnapshotId,
t.state.BaseSnapshotId,
)
if err != nil {
return err
Expand Down Expand Up @@ -267,6 +356,31 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

if len(t.state.BaseSnapshotId) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
t.state.BaseSnapshotId,
selfTaskID,
)
if err != nil {
return err
}
logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
t.state.BaseSnapshotId,
)

err = client.DeleteCheckpoint(
ctx,
t.request.SrcDisk.DiskId,
t.state.BaseSnapshotId,
)
if err != nil {
return err
}
}

return t.storage.SnapshotCreated(
ctx,
t.request.DstSnapshotId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (t *createSnapshotFromLegacySnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromLegacySnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (t *createSnapshotFromSnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromSnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (t *createSnapshotFromURLTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId, execCtx.GetTaskID())
return err
}

func (t *createSnapshotFromURLTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -39,7 +40,28 @@ func (t *deleteSnapshotTask) deletingSnapshot(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.SnapshotId)
snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.SnapshotId, execCtx.GetTaskID())
if err != nil {
return err
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
snapshotMeta.CreateTaskID,
)
if err != nil {
return err
}
logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
}

return nil
}

func (t *deleteSnapshotTask) Run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ option go_package = "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg

message CreateSnapshotFromDiskRequest {
reserved 3;
reserved 5;
reserved 6;

types.Disk SrcDisk = 1;
string SrcDiskBaseCheckpointId = 5;
string SrcDiskCheckpointId = 2;
string BaseSnapshotId = 6;
string DstSnapshotId = 4;
bool UseS3 = 7;
bool UseProxyOverlayDisk = 8;
Expand All @@ -38,6 +38,8 @@ message CreateSnapshotFromDiskTaskState {
uint32 ChunkCount = 5;
uint64 TransferredDataSize = 8;
optional bool ProxyOverlayDiskCreated = 10;
string BaseSnapshotId = 12;
string BaseCheckpointId = 13;
}

message CreateSnapshotFromDiskMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,35 @@ type snapshotState struct {
zoneID string
diskID string
checkpointID string
createTaskID string
creatingAt time.Time
createdAt time.Time
deletingAt time.Time
baseSnapshotID string
baseCheckpointID string
size uint64
storageSize uint64
chunkCount uint32
lockTaskID string
encryptionMode uint32
encryptionKeyHash []byte
status snapshotStatus
}

func (s *snapshotState) toSnapshotMeta() *SnapshotMeta {
return &SnapshotMeta{
Size: s.size,
StorageSize: s.storageSize,
ChunkCount: s.chunkCount,
ID: s.id,
Disk: &types.Disk{
ZoneId: s.zoneID,
DiskId: s.diskID,
},
CheckpointID: s.checkpointID,
CreateTaskID: s.createTaskID,
BaseSnapshotID: s.baseSnapshotID,
BaseCheckpointID: s.baseCheckpointID,
Size: s.size,
StorageSize: s.storageSize,
ChunkCount: s.chunkCount,
Encryption: &types.EncryptionDesc{
Mode: types.EncryptionMode(s.encryptionMode),
Key: &types.EncryptionDesc_KeyHash{
Expand All @@ -81,13 +93,16 @@ func (s *snapshotState) structValue() persistence.Value {
persistence.StructFieldValue("zone_id", persistence.UTF8Value(s.zoneID)),
persistence.StructFieldValue("disk_id", persistence.UTF8Value(s.diskID)),
persistence.StructFieldValue("checkpoint_id", persistence.UTF8Value(s.checkpointID)),
persistence.StructFieldValue("create_task_id", persistence.UTF8Value(s.createTaskID)),
persistence.StructFieldValue("creating_at", persistence.TimestampValue(s.creatingAt)),
persistence.StructFieldValue("created_at", persistence.TimestampValue(s.createdAt)),
persistence.StructFieldValue("deleting_at", persistence.TimestampValue(s.deletingAt)),
persistence.StructFieldValue("base_snapshot_id", persistence.UTF8Value(s.baseSnapshotID)),
persistence.StructFieldValue("base_checkpoint_id", persistence.UTF8Value(s.baseCheckpointID)),
persistence.StructFieldValue("size", persistence.Uint64Value(s.size)),
persistence.StructFieldValue("storage_size", persistence.Uint64Value(s.storageSize)),
persistence.StructFieldValue("chunk_count", persistence.Uint32Value(s.chunkCount)),
persistence.StructFieldValue("lock_task_id", persistence.UTF8Value(s.lockTaskID)),
persistence.StructFieldValue("encryption_mode", persistence.Uint32Value(s.encryptionMode)),
persistence.StructFieldValue("encryption_keyhash", persistence.StringValue(s.encryptionKeyHash)),
persistence.StructFieldValue("status", persistence.Int64Value(int64(s.status))),
Expand All @@ -100,13 +115,16 @@ func scanSnapshotState(res persistence.Result) (state snapshotState, err error)
persistence.OptionalWithDefault("zone_id", &state.zoneID),
persistence.OptionalWithDefault("disk_id", &state.diskID),
persistence.OptionalWithDefault("checkpoint_id", &state.checkpointID),
persistence.OptionalWithDefault("create_task_id", &state.createTaskID),
persistence.OptionalWithDefault("creating_at", &state.creatingAt),
persistence.OptionalWithDefault("created_at", &state.createdAt),
persistence.OptionalWithDefault("deleting_at", &state.deletingAt),
persistence.OptionalWithDefault("base_snapshot_id", &state.baseSnapshotID),
persistence.OptionalWithDefault("base_checkpoint_id", &state.baseCheckpointID),
persistence.OptionalWithDefault("size", &state.size),
persistence.OptionalWithDefault("storage_size", &state.storageSize),
persistence.OptionalWithDefault("chunk_count", &state.chunkCount),
persistence.OptionalWithDefault("lock_task_id", &state.lockTaskID),
persistence.OptionalWithDefault("encryption_mode", &state.encryptionMode),
persistence.OptionalWithDefault("encryption_keyhash", &state.encryptionKeyHash),
persistence.OptionalWithDefault("status", &state.status),
Expand Down Expand Up @@ -143,13 +161,16 @@ func snapshotStateStructTypeString() string {
zone_id: Utf8,
disk_id: Utf8,
checkpoint_id: Utf8,
create_task_id: Utf8,
creating_at: Timestamp,
created_at: Timestamp,
deleting_at: Timestamp,
base_snapshot_id: Utf8,
base_checkpoint_id: Utf8,
size: Uint64,
storage_size: Uint64,
chunk_count: Uint32,
lock_task_id: Utf8,
encryption_mode: Uint32,
encryption_keyhash: String,
status: Int64>`
Expand Down
Loading

0 comments on commit 676eb01

Please sign in to comment.