Skip to content

Commit

Permalink
move snapshot incremental logic to dataplane
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Aug 29, 2024
1 parent 8d6136f commit 5c694dd
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot"
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -62,7 +63,30 @@ func (t *createSnapshotFromDiskTask) Cancel(
return err
}

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
if err != nil {
return err
}

// NBS-3192.
if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
execCtx.GetTaskID(),
)
if err != nil {
return err
}

logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
}

return nil
}

func (t *createSnapshotFromDiskTask) GetMetadata(
Expand Down Expand Up @@ -111,10 +135,10 @@ func (t *createSnapshotFromDiskTask) run(
snapshotMeta, err := t.storage.CreateSnapshot(
ctx,
storage.SnapshotMeta{
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
BaseSnapshotID: t.request.BaseSnapshotId,
ID: t.request.DstSnapshotId,
Disk: t.request.SrcDisk,
CheckpointID: t.request.SrcDiskCheckpointId,
CreateTaskID: execCtx.GetTaskID(),
},
)
if err != nil {
Expand All @@ -126,6 +150,56 @@ func (t *createSnapshotFromDiskTask) run(
return nil
}

nbsClient, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return err
}

diskParams, err := nbsClient.Describe(ctx, t.request.SrcDisk.DiskId)
if err != nil {
return err
}

baseSnapshotID := snapshotMeta.BaseSnapshotID
baseCheckpointID := snapshotMeta.BaseCheckpointID

if diskParams.IsDiskRegistryBasedDisk {
// Should perform full snapshot of disk.
// TODO: enable incremental snapshots for such disks.
baseSnapshotID = ""
baseCheckpointID = ""
}

if len(baseSnapshotID) != 0 {
// Lock base snapshot to prevent deletion.
locked, err := t.storage.LockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
selfTaskID,
)
if err != nil {
return err
}

if locked {
logging.Info(
ctx,
"Successfully locked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
} else {
logging.Info(
ctx,
"Snapshot with id %v can't be locked",
snapshotMeta.BaseSnapshotID,
)

// Should perform full snapshot of disk.
baseSnapshotID = ""
baseCheckpointID = ""
}
}

client, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId)
if err != nil {
return err
Expand All @@ -140,20 +214,15 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

diskParams, err := client.Describe(ctx, t.request.SrcDisk.DiskId)
if err != nil {
return err
}

incremental := len(t.request.BaseSnapshotId) != 0
incremental := len(baseSnapshotID) != 0

source, err := nbs.NewDiskSource(
ctx,
client,
t.request.SrcDisk.DiskId,
proxyOverlayDiskID,
t.request.SrcDiskBaseCheckpointId,
t.request.SrcDiskCheckpointId,
baseSnapshotID,
baseCheckpointID,
diskParams.EncryptionDesc,
chunkSize,
incremental, // duplicateChunkIndices
Expand Down Expand Up @@ -267,6 +336,34 @@ func (t *createSnapshotFromDiskTask) run(
return err
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
selfTaskID,
)
if err != nil {
return err
}

logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err = nbsClient.DeleteCheckpoint(
ctx,
t.request.SrcDisk.DiskId,
snapshotMeta.BaseCheckpointID,
)
if err != nil {
return err
}
}

return t.storage.SnapshotCreated(
ctx,
t.request.DstSnapshotId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ func (t *createSnapshotFromLegacySnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
return err
}

func (t *createSnapshotFromLegacySnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func (t *createSnapshotFromSnapshotTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
return err
}

func (t *createSnapshotFromSnapshotTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ func (t *createSnapshotFromURLTask) Cancel(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
_, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId)
return err
}

func (t *createSnapshotFromURLTask) GetMetadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage"
"github.com/ydb-platform/nbs/cloud/tasks"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -39,7 +40,29 @@ func (t *deleteSnapshotTask) deletingSnapshot(
execCtx tasks.ExecutionContext,
) error {

return t.storage.DeletingSnapshot(ctx, t.request.SnapshotId)
snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.SnapshotId)
if err != nil {
return err
}

if len(snapshotMeta.BaseSnapshotID) != 0 {
err := t.storage.UnlockSnapshot(
ctx,
snapshotMeta.BaseSnapshotID,
snapshotMeta.CreateTaskID,
)
if err != nil {
return err
}

logging.Info(
ctx,
"Successfully unlocked snapshot with id %v",
snapshotMeta.BaseSnapshotID,
)
}

return nil
}

func (t *deleteSnapshotTask) Run(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,16 @@ type snapshotState struct {
zoneID string
diskID string
checkpointID string
createTaskID string
creatingAt time.Time
createdAt time.Time
deletingAt time.Time
baseSnapshotID string
baseCheckpointID string
size uint64
storageSize uint64
chunkCount uint32
lockTaskID string
encryptionMode uint32
encryptionKeyHash []byte
status snapshotStatus
Expand All @@ -81,13 +84,16 @@ func (s *snapshotState) structValue() persistence.Value {
persistence.StructFieldValue("zone_id", persistence.UTF8Value(s.zoneID)),
persistence.StructFieldValue("disk_id", persistence.UTF8Value(s.diskID)),
persistence.StructFieldValue("checkpoint_id", persistence.UTF8Value(s.checkpointID)),
persistence.StructFieldValue("create_task_id", persistence.UTF8Value(s.createTaskID)),
persistence.StructFieldValue("creating_at", persistence.TimestampValue(s.creatingAt)),
persistence.StructFieldValue("created_at", persistence.TimestampValue(s.createdAt)),
persistence.StructFieldValue("deleting_at", persistence.TimestampValue(s.deletingAt)),
persistence.StructFieldValue("base_snapshot_id", persistence.UTF8Value(s.baseSnapshotID)),
persistence.StructFieldValue("base_checkpoint_id", persistence.UTF8Value(s.baseCheckpointID)),
persistence.StructFieldValue("size", persistence.Uint64Value(s.size)),
persistence.StructFieldValue("storage_size", persistence.Uint64Value(s.storageSize)),
persistence.StructFieldValue("chunk_count", persistence.Uint32Value(s.chunkCount)),
persistence.StructFieldValue("lock_task_id", persistence.UTF8Value(s.lockTaskID)),
persistence.StructFieldValue("encryption_mode", persistence.Uint32Value(s.encryptionMode)),
persistence.StructFieldValue("encryption_keyhash", persistence.StringValue(s.encryptionKeyHash)),
persistence.StructFieldValue("status", persistence.Int64Value(int64(s.status))),
Expand All @@ -100,13 +106,16 @@ func scanSnapshotState(res persistence.Result) (state snapshotState, err error)
persistence.OptionalWithDefault("zone_id", &state.zoneID),
persistence.OptionalWithDefault("disk_id", &state.diskID),
persistence.OptionalWithDefault("checkpoint_id", &state.checkpointID),
persistence.OptionalWithDefault("create_task_id", &state.createTaskID),
persistence.OptionalWithDefault("creating_at", &state.creatingAt),
persistence.OptionalWithDefault("created_at", &state.createdAt),
persistence.OptionalWithDefault("deleting_at", &state.deletingAt),
persistence.OptionalWithDefault("base_snapshot_id", &state.baseSnapshotID),
persistence.OptionalWithDefault("base_checkpoint_id", &state.baseCheckpointID),
persistence.OptionalWithDefault("size", &state.size),
persistence.OptionalWithDefault("storage_size", &state.storageSize),
persistence.OptionalWithDefault("chunk_count", &state.chunkCount),
persistence.OptionalWithDefault("lock_task_id", &state.lockTaskID),
persistence.OptionalWithDefault("encryption_mode", &state.encryptionMode),
persistence.OptionalWithDefault("encryption_keyhash", &state.encryptionKeyHash),
persistence.OptionalWithDefault("status", &state.status),
Expand Down Expand Up @@ -143,13 +152,16 @@ func snapshotStateStructTypeString() string {
zone_id: Utf8,
disk_id: Utf8,
checkpoint_id: Utf8,
create_task_id: Utf8,
creating_at: Timestamp,
created_at: Timestamp,
deleting_at: Timestamp,
base_snapshot_id: Utf8,
base_checkpoint_id: Utf8,
size: Uint64,
storage_size: Uint64,
chunk_count: Uint32,
lock_task_id: Utf8,
encryption_mode: Uint32,
encryption_keyhash: String,
status: Int64>`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,16 @@ func snapshotStateTableDescription() persistence.CreateTableDescription {
persistence.WithColumn("zone_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("disk_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("checkpoint_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("create_task_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("creating_at", persistence.Optional(persistence.TypeTimestamp)),
persistence.WithColumn("created_at", persistence.Optional(persistence.TypeTimestamp)),
persistence.WithColumn("deleting_at", persistence.Optional(persistence.TypeTimestamp)),
persistence.WithColumn("base_snapshot_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("base_checkpoint_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("size", persistence.Optional(persistence.TypeUint64)),
persistence.WithColumn("storage_size", persistence.Optional(persistence.TypeUint64)),
persistence.WithColumn("chunk_count", persistence.Optional(persistence.TypeUint32)),
persistence.WithColumn("lock_task_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("encryption_mode", persistence.Optional(persistence.TypeUint32)),
persistence.WithColumn("encryption_keyhash", persistence.Optional(persistence.TypeString)),
persistence.WithColumn("status", persistence.Optional(persistence.TypeInt64)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
////////////////////////////////////////////////////////////////////////////////

type SnapshotMeta struct {
ID string
Disk *types.Disk
CheckpointID string
BaseSnapshotID string
ID string
Disk *types.Disk
CheckpointID string
CreateTaskID string
BaseSnapshotID string
BaseCheckpointID string
// Snapshot virtual size, i.e. the minimum amount of disk space needed to restore.
Size uint64
// Snapshot real size, i.e. the amount of disk space occupied in storage.
Expand Down Expand Up @@ -50,7 +52,7 @@ type Storage interface {
encryption *types.EncryptionDesc,
) error

DeletingSnapshot(ctx context.Context, snapshotID string) error
DeletingSnapshot(ctx context.Context, snapshotID string) (*SnapshotMeta, error)

GetSnapshotsToDelete(
ctx context.Context,
Expand Down Expand Up @@ -118,4 +120,12 @@ type Storage interface {
zoneID string,
diskID string,
) error

LockSnapshot(
ctx context.Context,
snapshotID string,
lockTaskID string,
) (locked bool, err error)

UnlockSnapshot(ctx context.Context, snapshotID string, lockTaskID string) error
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ func (s *legacyStorage) SnapshotCreated(
func (s *legacyStorage) DeletingSnapshot(
ctx context.Context,
snapshotID string,
) error {
) (*SnapshotMeta, error) {

return task_errors.NewNonRetriableErrorf("not implemented")
return nil, task_errors.NewNonRetriableErrorf("not implemented")
}

func (s *legacyStorage) GetSnapshotsToDelete(
Expand Down Expand Up @@ -250,3 +250,21 @@ func (s *legacyStorage) DeleteDiskFromIncremental(

return task_errors.NewNonRetriableErrorf("not implemented")
}

func (s *legacyStorage) LockSnapshot(
ctx context.Context,
snapshotID string,
lockTaskID string,
) (locked bool, err error) {

return false, task_errors.NewNonRetriableErrorf("not implemented")
}

func (s *legacyStorage) UnlockSnapshot(
ctx context.Context,
snapshotID string,
lockTaskID string,
) error {

return task_errors.NewNonRetriableErrorf("not implemented")
}
Loading

0 comments on commit 5c694dd

Please sign in to comment.