diff --git a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_disk_task.go b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_disk_task.go index c7da734af0a..6245e744889 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_disk_task.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage" "github.com/ydb-platform/nbs/cloud/tasks" + "github.com/ydb-platform/nbs/cloud/tasks/logging" ) //////////////////////////////////////////////////////////////////////////////// @@ -62,7 +63,30 @@ func (t *createSnapshotFromDiskTask) Cancel( return err } - return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + if err != nil { + return err + } + + // NBS-3192. + if len(snapshotMeta.BaseSnapshotID) != 0 { + err := t.storage.UnlockSnapshot( + ctx, + snapshotMeta.BaseSnapshotID, + execCtx.GetTaskID(), + ) + if err != nil { + return err + } + + logging.Info( + ctx, + "Successfully unlocked snapshot with id %v", + snapshotMeta.BaseSnapshotID, + ) + } + + return nil } func (t *createSnapshotFromDiskTask) GetMetadata( @@ -111,10 +135,10 @@ func (t *createSnapshotFromDiskTask) run( snapshotMeta, err := t.storage.CreateSnapshot( ctx, storage.SnapshotMeta{ - ID: t.request.DstSnapshotId, - Disk: t.request.SrcDisk, - CheckpointID: t.request.SrcDiskCheckpointId, - BaseSnapshotID: t.request.BaseSnapshotId, + ID: t.request.DstSnapshotId, + Disk: t.request.SrcDisk, + CheckpointID: t.request.SrcDiskCheckpointId, + CreateTaskID: execCtx.GetTaskID(), }, ) if err != nil { @@ -126,6 +150,56 @@ func (t *createSnapshotFromDiskTask) run( return nil } + nbsClient, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId) + if err != nil { + return err + } + + diskParams, err := nbsClient.Describe(ctx, t.request.SrcDisk.DiskId) + if err != nil { + return err + } + + baseSnapshotID := snapshotMeta.BaseSnapshotID + baseCheckpointID := snapshotMeta.BaseCheckpointID + + if diskParams.IsDiskRegistryBasedDisk { + // Should perform full snapshot of disk. + // TODO: enable incremental snapshots for such disks. + baseSnapshotID = "" + baseCheckpointID = "" + } + + if len(baseSnapshotID) != 0 { + // Lock base snapshot to prevent deletion. + locked, err := t.storage.LockSnapshot( + ctx, + snapshotMeta.BaseSnapshotID, + selfTaskID, + ) + if err != nil { + return err + } + + if locked { + logging.Info( + ctx, + "Successfully locked snapshot with id %v", + snapshotMeta.BaseSnapshotID, + ) + } else { + logging.Info( + ctx, + "Snapshot with id %v can't be locked", + snapshotMeta.BaseSnapshotID, + ) + + // Should perform full snapshot of disk. + baseSnapshotID = "" + baseCheckpointID = "" + } + } + client, err := t.nbsFactory.GetClient(ctx, t.request.SrcDisk.ZoneId) if err != nil { return err @@ -140,20 +214,15 @@ func (t *createSnapshotFromDiskTask) run( return err } - diskParams, err := client.Describe(ctx, t.request.SrcDisk.DiskId) - if err != nil { - return err - } - - incremental := len(t.request.BaseSnapshotId) != 0 + incremental := len(baseSnapshotID) != 0 source, err := nbs.NewDiskSource( ctx, client, t.request.SrcDisk.DiskId, proxyOverlayDiskID, - t.request.SrcDiskBaseCheckpointId, - t.request.SrcDiskCheckpointId, + baseSnapshotID, + baseCheckpointID, diskParams.EncryptionDesc, chunkSize, incremental, // duplicateChunkIndices @@ -267,6 +336,34 @@ func (t *createSnapshotFromDiskTask) run( return err } + if len(snapshotMeta.BaseSnapshotID) != 0 { + err := t.storage.UnlockSnapshot( + ctx, + snapshotMeta.BaseSnapshotID, + selfTaskID, + ) + if err != nil { + return err + } + + logging.Info( + ctx, + "Successfully unlocked snapshot with id %v", + snapshotMeta.BaseSnapshotID, + ) + } + + if len(snapshotMeta.BaseSnapshotID) != 0 { + err = nbsClient.DeleteCheckpoint( + ctx, + t.request.SrcDisk.DiskId, + snapshotMeta.BaseCheckpointID, + ) + if err != nil { + return err + } + } + return t.storage.SnapshotCreated( ctx, t.request.DstSnapshotId, diff --git a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_legacy_snapshot_task.go b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_legacy_snapshot_task.go index 86dbc3f2981..3cfd3a0414e 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_legacy_snapshot_task.go +++ b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_legacy_snapshot_task.go @@ -155,7 +155,8 @@ func (t *createSnapshotFromLegacySnapshotTask) Cancel( execCtx tasks.ExecutionContext, ) error { - return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + _, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + return err } func (t *createSnapshotFromLegacySnapshotTask) GetMetadata( diff --git a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_snapshot_task.go b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_snapshot_task.go index e86232527b3..494dee3fb67 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_snapshot_task.go +++ b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_snapshot_task.go @@ -100,7 +100,8 @@ func (t *createSnapshotFromSnapshotTask) Cancel( execCtx tasks.ExecutionContext, ) error { - return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + _, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + return err } func (t *createSnapshotFromSnapshotTask) GetMetadata( diff --git a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_url_task.go b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_url_task.go index 2bfedff110e..3e698f334cc 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_url_task.go +++ b/cloud/disk_manager/internal/pkg/dataplane/create_snapshot_from_url_task.go @@ -176,7 +176,8 @@ func (t *createSnapshotFromURLTask) Cancel( execCtx tasks.ExecutionContext, ) error { - return t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + _, err := t.storage.DeletingSnapshot(ctx, t.request.DstSnapshotId) + return err } func (t *createSnapshotFromURLTask) GetMetadata( diff --git a/cloud/disk_manager/internal/pkg/dataplane/delete_snapshot_task.go b/cloud/disk_manager/internal/pkg/dataplane/delete_snapshot_task.go index 3dbddc1aec3..118620ece41 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/delete_snapshot_task.go +++ b/cloud/disk_manager/internal/pkg/dataplane/delete_snapshot_task.go @@ -9,6 +9,7 @@ import ( "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage" "github.com/ydb-platform/nbs/cloud/tasks" "github.com/ydb-platform/nbs/cloud/tasks/errors" + "github.com/ydb-platform/nbs/cloud/tasks/logging" ) //////////////////////////////////////////////////////////////////////////////// @@ -39,7 +40,29 @@ func (t *deleteSnapshotTask) deletingSnapshot( execCtx tasks.ExecutionContext, ) error { - return t.storage.DeletingSnapshot(ctx, t.request.SnapshotId) + snapshotMeta, err := t.storage.DeletingSnapshot(ctx, t.request.SnapshotId) + if err != nil { + return err + } + + if len(snapshotMeta.BaseSnapshotID) != 0 { + err := t.storage.UnlockSnapshot( + ctx, + snapshotMeta.BaseSnapshotID, + snapshotMeta.CreateTaskID, + ) + if err != nil { + return err + } + + logging.Info( + ctx, + "Successfully unlocked snapshot with id %v", + snapshotMeta.BaseSnapshotID, + ) + } + + return nil } func (t *deleteSnapshotTask) Run( diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/common.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/common.go index 3331bb5ce39..bd3da074e3f 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/common.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/common.go @@ -48,13 +48,16 @@ type snapshotState struct { zoneID string diskID string checkpointID string + createTaskID string creatingAt time.Time createdAt time.Time deletingAt time.Time baseSnapshotID string + baseCheckpointID string size uint64 storageSize uint64 chunkCount uint32 + lockTaskID string encryptionMode uint32 encryptionKeyHash []byte status snapshotStatus @@ -81,13 +84,16 @@ func (s *snapshotState) structValue() persistence.Value { persistence.StructFieldValue("zone_id", persistence.UTF8Value(s.zoneID)), persistence.StructFieldValue("disk_id", persistence.UTF8Value(s.diskID)), persistence.StructFieldValue("checkpoint_id", persistence.UTF8Value(s.checkpointID)), + persistence.StructFieldValue("create_task_id", persistence.UTF8Value(s.createTaskID)), persistence.StructFieldValue("creating_at", persistence.TimestampValue(s.creatingAt)), persistence.StructFieldValue("created_at", persistence.TimestampValue(s.createdAt)), persistence.StructFieldValue("deleting_at", persistence.TimestampValue(s.deletingAt)), persistence.StructFieldValue("base_snapshot_id", persistence.UTF8Value(s.baseSnapshotID)), + persistence.StructFieldValue("base_checkpoint_id", persistence.UTF8Value(s.baseCheckpointID)), persistence.StructFieldValue("size", persistence.Uint64Value(s.size)), persistence.StructFieldValue("storage_size", persistence.Uint64Value(s.storageSize)), persistence.StructFieldValue("chunk_count", persistence.Uint32Value(s.chunkCount)), + persistence.StructFieldValue("lock_task_id", persistence.UTF8Value(s.lockTaskID)), persistence.StructFieldValue("encryption_mode", persistence.Uint32Value(s.encryptionMode)), persistence.StructFieldValue("encryption_keyhash", persistence.StringValue(s.encryptionKeyHash)), persistence.StructFieldValue("status", persistence.Int64Value(int64(s.status))), @@ -100,13 +106,16 @@ func scanSnapshotState(res persistence.Result) (state snapshotState, err error) persistence.OptionalWithDefault("zone_id", &state.zoneID), persistence.OptionalWithDefault("disk_id", &state.diskID), persistence.OptionalWithDefault("checkpoint_id", &state.checkpointID), + persistence.OptionalWithDefault("create_task_id", &state.createTaskID), persistence.OptionalWithDefault("creating_at", &state.creatingAt), persistence.OptionalWithDefault("created_at", &state.createdAt), persistence.OptionalWithDefault("deleting_at", &state.deletingAt), persistence.OptionalWithDefault("base_snapshot_id", &state.baseSnapshotID), + persistence.OptionalWithDefault("base_checkpoint_id", &state.baseCheckpointID), persistence.OptionalWithDefault("size", &state.size), persistence.OptionalWithDefault("storage_size", &state.storageSize), persistence.OptionalWithDefault("chunk_count", &state.chunkCount), + persistence.OptionalWithDefault("lock_task_id", &state.lockTaskID), persistence.OptionalWithDefault("encryption_mode", &state.encryptionMode), persistence.OptionalWithDefault("encryption_keyhash", &state.encryptionKeyHash), persistence.OptionalWithDefault("status", &state.status), @@ -143,13 +152,16 @@ func snapshotStateStructTypeString() string { zone_id: Utf8, disk_id: Utf8, checkpoint_id: Utf8, + create_task_id: Utf8, creating_at: Timestamp, created_at: Timestamp, deleting_at: Timestamp, base_snapshot_id: Utf8, + base_checkpoint_id: Utf8, size: Uint64, storage_size: Uint64, chunk_count: Uint32, + lock_task_id: Utf8, encryption_mode: Uint32, encryption_keyhash: String, status: Int64>` diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/schema/schema.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/schema/schema.go index 09a9c64e3ff..5edb59abce0 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/schema/schema.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/schema/schema.go @@ -182,13 +182,16 @@ func snapshotStateTableDescription() persistence.CreateTableDescription { persistence.WithColumn("zone_id", persistence.Optional(persistence.TypeUTF8)), persistence.WithColumn("disk_id", persistence.Optional(persistence.TypeUTF8)), persistence.WithColumn("checkpoint_id", persistence.Optional(persistence.TypeUTF8)), + persistence.WithColumn("create_task_id", persistence.Optional(persistence.TypeUTF8)), persistence.WithColumn("creating_at", persistence.Optional(persistence.TypeTimestamp)), persistence.WithColumn("created_at", persistence.Optional(persistence.TypeTimestamp)), persistence.WithColumn("deleting_at", persistence.Optional(persistence.TypeTimestamp)), persistence.WithColumn("base_snapshot_id", persistence.Optional(persistence.TypeUTF8)), + persistence.WithColumn("base_checkpoint_id", persistence.Optional(persistence.TypeUTF8)), persistence.WithColumn("size", persistence.Optional(persistence.TypeUint64)), persistence.WithColumn("storage_size", persistence.Optional(persistence.TypeUint64)), persistence.WithColumn("chunk_count", persistence.Optional(persistence.TypeUint32)), + persistence.WithColumn("lock_task_id", persistence.Optional(persistence.TypeUTF8)), persistence.WithColumn("encryption_mode", persistence.Optional(persistence.TypeUint32)), persistence.WithColumn("encryption_keyhash", persistence.Optional(persistence.TypeString)), persistence.WithColumn("status", persistence.Optional(persistence.TypeInt64)), diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go index af769309782..47ec45b25e9 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage.go @@ -12,10 +12,12 @@ import ( //////////////////////////////////////////////////////////////////////////////// type SnapshotMeta struct { - ID string - Disk *types.Disk - CheckpointID string - BaseSnapshotID string + ID string + Disk *types.Disk + CheckpointID string + CreateTaskID string + BaseSnapshotID string + BaseCheckpointID string // Snapshot virtual size, i.e. the minimum amount of disk space needed to restore. Size uint64 // Snapshot real size, i.e. the amount of disk space occupied in storage. @@ -50,7 +52,7 @@ type Storage interface { encryption *types.EncryptionDesc, ) error - DeletingSnapshot(ctx context.Context, snapshotID string) error + DeletingSnapshot(ctx context.Context, snapshotID string) (*SnapshotMeta, error) GetSnapshotsToDelete( ctx context.Context, @@ -118,4 +120,12 @@ type Storage interface { zoneID string, diskID string, ) error + + LockSnapshot( + ctx context.Context, + snapshotID string, + lockTaskID string, + ) (locked bool, err error) + + UnlockSnapshot(ctx context.Context, snapshotID string, lockTaskID string) error } diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go index f358304edcd..909baa044c1 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_legacy.go @@ -138,9 +138,9 @@ func (s *legacyStorage) SnapshotCreated( func (s *legacyStorage) DeletingSnapshot( ctx context.Context, snapshotID string, -) error { +) (*SnapshotMeta, error) { - return task_errors.NewNonRetriableErrorf("not implemented") + return nil, task_errors.NewNonRetriableErrorf("not implemented") } func (s *legacyStorage) GetSnapshotsToDelete( @@ -250,3 +250,21 @@ func (s *legacyStorage) DeleteDiskFromIncremental( return task_errors.NewNonRetriableErrorf("not implemented") } + +func (s *legacyStorage) LockSnapshot( + ctx context.Context, + snapshotID string, + lockTaskID string, +) (locked bool, err error) { + + return false, task_errors.NewNonRetriableErrorf("not implemented") +} + +func (s *legacyStorage) UnlockSnapshot( + ctx context.Context, + snapshotID string, + lockTaskID string, +) error { + + return task_errors.NewNonRetriableErrorf("not implemented") +} diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go index cbf90d206bf..2ecc894096d 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb.go @@ -73,18 +73,23 @@ func (s *storageYDB) SnapshotCreated( func (s *storageYDB) DeletingSnapshot( ctx context.Context, snapshotID string, -) error { +) (*SnapshotMeta, error) { - return s.db.Execute( + var deleting *SnapshotMeta + + err := s.db.Execute( ctx, func(ctx context.Context, session *persistence.Session) error { - return s.deletingSnapshot( + var err error + deleting, err = s.deletingSnapshot( ctx, session, snapshotID, ) + return err }, ) + return deleting, err } func (s *storageYDB) DeleteSnapshotData( @@ -179,3 +184,33 @@ func (s *storageYDB) DeleteDiskFromIncremental( }, ) } + +func (s *storageYDB) LockSnapshot( + ctx context.Context, + snapshotID string, + lockTaskID string, +) (locked bool, err error) { + + err = s.db.Execute( + ctx, + func(ctx context.Context, session *persistence.Session) error { + locked, err = s.lockSnapshot(ctx, session, snapshotID, lockTaskID) + return err + }, + ) + return locked, err +} + +func (s *storageYDB) UnlockSnapshot( + ctx context.Context, + snapshotID string, + lockTaskID string, +) error { + + return s.db.Execute( + ctx, + func(ctx context.Context, session *persistence.Session) error { + return s.unlockSnapshot(ctx, session, snapshotID, lockTaskID) + }, + ) +} diff --git a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go index c365250b662..839b0213c08 100644 --- a/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go +++ b/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/storage_ydb_impl.go @@ -12,6 +12,7 @@ import ( "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/chunks" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/dataplane/snapshot/storage/protos" "github.com/ydb-platform/nbs/cloud/disk_manager/internal/pkg/types" + "github.com/ydb-platform/nbs/cloud/tasks/errors" task_errors "github.com/ydb-platform/nbs/cloud/tasks/errors" "github.com/ydb-platform/nbs/cloud/tasks/logging" "github.com/ydb-platform/nbs/cloud/tasks/persistence" @@ -36,6 +37,45 @@ func makeShardID(s string) uint64 { //////////////////////////////////////////////////////////////////////////////// +func (s *storageYDB) getIncremental( + ctx context.Context, + tx *persistence.Transaction, + disk *types.Disk, +) (snapshotID string, checkpointID string, err error) { + + res, err := tx.Execute(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $zone_id as Utf8; + declare $disk_id as Utf8; + + select * + from incremental + where zone_id = $zone_id and disk_id = $disk_id + `, s.tablesPath), + persistence.ValueParam("$zone_id", persistence.UTF8Value(disk.ZoneId)), + persistence.ValueParam("$disk_id", persistence.UTF8Value(disk.DiskId)), + ) + if err != nil { + return "", "", err + } + defer res.Close() + + for res.NextResultSet(ctx) { + for res.NextRow() { + err = res.ScanNamed( + persistence.OptionalWithDefault("snapshot_id", &snapshotID), + persistence.OptionalWithDefault("checkpoint_id", &checkpointID), + ) + if err != nil { + return "", "", err + } + } + } + + return +} + func (s *storageYDB) createSnapshot( ctx context.Context, session *persistence.Session, @@ -91,15 +131,26 @@ func (s *storageYDB) createSnapshot( } state := snapshotState{ - id: snapshotMeta.ID, - creatingAt: time.Now(), - status: snapshotStatusCreating, + id: snapshotMeta.ID, + createTaskID: snapshotMeta.CreateTaskID, + creatingAt: time.Now(), + status: snapshotStatusCreating, } if snapshotMeta.Disk != nil { state.zoneID = snapshotMeta.Disk.ZoneId state.diskID = snapshotMeta.Disk.DiskId state.checkpointID = snapshotMeta.CheckpointID - state.baseSnapshotID = snapshotMeta.BaseSnapshotID + + baseSnapshotID, baseCheckpointID, err := s.getIncremental( + ctx, + tx, + snapshotMeta.Disk, + ) + if err != nil { + return nil, err + } + state.baseSnapshotID = baseSnapshotID + state.baseCheckpointID = baseCheckpointID } _, err = tx.Execute(ctx, fmt.Sprintf(` @@ -333,7 +384,7 @@ func (s *storageYDB) deletingSnapshot( ctx context.Context, session *persistence.Session, snapshotID string, -) (err error) { +) (deleting *SnapshotMeta, err error) { defer s.metrics.StatOperation("deletingSnapshot")(&err) @@ -341,7 +392,7 @@ func (s *storageYDB) deletingSnapshot( tx, err := session.BeginRWTransaction(ctx) if err != nil { - return err + return nil, err } defer tx.Rollback(ctx) @@ -357,13 +408,13 @@ func (s *storageYDB) deletingSnapshot( persistence.ValueParam("$id", persistence.UTF8Value(snapshotID)), ) if err != nil { - return err + return nil, err } defer res.Close() states, err := scanSnapshotStates(ctx, res) if err != nil { - return err + return nil, err } var state snapshotState @@ -376,11 +427,11 @@ func (s *storageYDB) deletingSnapshot( err = tx.Commit(ctx) if err != nil { - return err + return nil, err } // Should be idempotent. - return nil + return state.toSnapshotMeta(), err } } @@ -400,7 +451,7 @@ func (s *storageYDB) deletingSnapshot( persistence.ValueParam("$states", persistence.ListValue(state.structValue())), ) if err != nil { - return err + return nil, err } _, err = tx.Execute(ctx, fmt.Sprintf(` @@ -416,7 +467,7 @@ func (s *storageYDB) deletingSnapshot( persistence.ValueParam("$snapshot_id", persistence.UTF8Value(snapshotID)), ) if err != nil { - return err + return nil, err } _, err = tx.Execute(ctx, fmt.Sprintf(` @@ -434,10 +485,10 @@ func (s *storageYDB) deletingSnapshot( persistence.ValueParam("$snapshot_id", persistence.UTF8Value(snapshotID)), ) if err != nil { - return err + return nil, err } - return tx.Commit(ctx) + return state.toSnapshotMeta(), tx.Commit(ctx) } func (s *storageYDB) GetSnapshotsToDelete( @@ -1124,3 +1175,167 @@ func (s *storageYDB) getChunkStorage(useS3 bool) chunks.Storage { return s.chunkStorageYDB } } + +func (s *storageYDB) lockSnapshot( + ctx context.Context, + session *persistence.Session, + snapshotID string, + lockTaskID string, +) (locked bool, err error) { + + tx, err := session.BeginRWTransaction(ctx) + if err != nil { + return false, err + } + defer tx.Rollback(ctx) + + res, err := tx.Execute(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $id as Utf8; + + select * + from snapshots + where id = $id + `, s.tablesPath), + persistence.ValueParam("$id", persistence.UTF8Value(snapshotID)), + ) + if err != nil { + return false, err + } + defer res.Close() + + states, err := scanSnapshotStates(ctx, res) + if err != nil { + return false, err + } + + if len(states) == 0 { + return false, tx.Commit(ctx) + } + + state := states[0] + if state.status >= snapshotStatusDeleting { + return false, tx.Commit(ctx) + } + + if len(state.lockTaskID) != 0 { + err = tx.Commit(ctx) + if err != nil { + return false, err + } + + if state.lockTaskID == lockTaskID { + // Should be idempotent. + return true, nil + } + + // Unlikely situation. Another lock is found. + return false, errors.NewInterruptExecutionError() + } + + state.lockTaskID = lockTaskID + + _, err = tx.Execute(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $states as List<%v>; + + upsert into snapshots + select * + from AS_TABLE($states) + `, s.tablesPath, snapshotStateStructTypeString()), + persistence.ValueParam("$states", persistence.ListValue(state.structValue())), + ) + if err != nil { + return false, err + } + + err = tx.Commit(ctx) + if err != nil { + return false, err + } + + return true, nil +} + +func (s *storageYDB) unlockSnapshot( + ctx context.Context, + session *persistence.Session, + snapshotID string, + lockTaskID string, +) error { + + tx, err := session.BeginRWTransaction(ctx) + if err != nil { + return err + } + defer tx.Rollback(ctx) + + res, err := tx.Execute(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $id as Utf8; + + select * + from snapshots + where id = $id + `, s.tablesPath), + persistence.ValueParam("$id", persistence.UTF8Value(snapshotID)), + ) + if err != nil { + return err + } + defer res.Close() + + states, err := scanSnapshotStates(ctx, res) + if err != nil { + return err + } + + if len(states) == 0 { + // Should be idempotent. + return tx.Commit(ctx) + } + + state := states[0] + if state.status >= snapshotStatusDeleting { + // Should be idempotent. + return tx.Commit(ctx) + } + + if len(state.lockTaskID) == 0 { + // Should be idempotent. + return tx.Commit(ctx) + } + + if state.lockTaskID != lockTaskID { + // Our lock is not present, so it's a success. + return tx.Commit(ctx) + } + + state.lockTaskID = "" + + _, err = tx.Execute(ctx, fmt.Sprintf(` + --!syntax_v1 + pragma TablePathPrefix = "%v"; + declare $states as List<%v>; + + upsert into snapshots + select * + from AS_TABLE($states) + `, s.tablesPath, snapshotStateStructTypeString()), + persistence.ValueParam("$states", persistence.ListValue(state.structValue())), + ) + if err != nil { + return err + } + + err = tx.Commit(ctx) + if err != nil { + return err + } + + logging.Info(ctx, "Successfully unlocked snapshot with id %v", snapshotID) + return nil +} diff --git a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/ya.make b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/ya.make index 9707ccc300d..bc23b9e1319 100644 --- a/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/ya.make +++ b/cloud/disk_manager/internal/pkg/facade/snapshot_service_test/ya.make @@ -3,6 +3,8 @@ GO_TEST_FOR(cloud/disk_manager/internal/pkg/facade) SET_APPEND(RECIPE_ARGS --creation-and-deletion-allowed-only-for-disks-with-id-prefix "Test") INCLUDE(${ARCADIA_ROOT}/cloud/disk_manager/internal/pkg/facade/testcommon/common.inc) +TIMEOUT(120) + GO_XTEST_SRCS( snapshot_service_test.go ) diff --git a/cloud/disk_manager/internal/pkg/resources/disks.go b/cloud/disk_manager/internal/pkg/resources/disks.go index d29103240e8..04a7dd4d434 100644 --- a/cloud/disk_manager/internal/pkg/resources/disks.go +++ b/cloud/disk_manager/internal/pkg/resources/disks.go @@ -220,7 +220,7 @@ func diskStateStructTypeString() string { scanned_at: Timestamp, scan_found_broken_blobs: Bool, - + fill_generation: Uint64>` } @@ -537,11 +537,6 @@ func (s *storageYDB) deleteDisk( return nil, err } - err = s.deleteDiskFromIncremental(ctx, tx, state.id, state.zoneID) - if err != nil { - return nil, err - } - err = tx.Commit(ctx) if err != nil { return nil, err @@ -1095,12 +1090,7 @@ func (s *storageYDB) DiskRelocated( state.zoneID = dstZoneID - err = s.updateDiskState(ctx, tx, state) - if err != nil { - return err - } - - return s.deleteDiskFromIncremental(ctx, tx, diskID, oldZoneID) + return s.updateDiskState(ctx, tx, state) } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/disk_manager/internal/pkg/resources/snapshots.go b/cloud/disk_manager/internal/pkg/resources/snapshots.go index cac3f4a8e1c..1d9af6cdc09 100644 --- a/cloud/disk_manager/internal/pkg/resources/snapshots.go +++ b/cloud/disk_manager/internal/pkg/resources/snapshots.go @@ -63,8 +63,8 @@ type snapshotState struct { deleteTaskID string deletingAt time.Time deletedAt time.Time - baseSnapshotID string - baseCheckpointID string + baseSnapshotID string // todo remove + baseCheckpointID string // todo remove useDataplaneTasks bool size uint64 storageSize uint64 @@ -276,67 +276,6 @@ func (s *storageYDB) snapshotExists( return count != 0, nil } -func (s *storageYDB) getIncremental( - ctx context.Context, - tx *persistence.Transaction, - disk *types.Disk, -) (snapshotID string, checkpointID string, err error) { - - res, err := tx.Execute(ctx, fmt.Sprintf(` - --!syntax_v1 - pragma TablePathPrefix = "%v"; - declare $zone_id as Utf8; - declare $disk_id as Utf8; - - select * - from incremental - where zone_id = $zone_id and disk_id = $disk_id - `, s.snapshotsPath), - persistence.ValueParam("$zone_id", persistence.UTF8Value(disk.ZoneId)), - persistence.ValueParam("$disk_id", persistence.UTF8Value(disk.DiskId)), - ) - if err != nil { - return "", "", err - } - defer res.Close() - - for res.NextResultSet(ctx) { - for res.NextRow() { - err = res.ScanNamed( - persistence.OptionalWithDefault("snapshot_id", &snapshotID), - persistence.OptionalWithDefault("checkpoint_id", &checkpointID), - ) - if err != nil { - return "", "", err - } - } - } - - return -} - -func (s *storageYDB) deleteDiskFromIncremental( - ctx context.Context, - tx *persistence.Transaction, - diskID string, - zoneID string, -) error { - - _, err := tx.Execute(ctx, fmt.Sprintf(` - --!syntax_v1 - pragma TablePathPrefix = "%v"; - declare $zone_id as Utf8; - declare $disk_id as Utf8; - - delete from incremental - where zone_id = $zone_id and disk_id = $disk_id - `, s.snapshotsPath), - persistence.ValueParam("$zone_id", persistence.UTF8Value(zoneID)), - persistence.ValueParam("$disk_id", persistence.UTF8Value(diskID)), - ) - return err -} - func (s *storageYDB) getSnapshotMeta( ctx context.Context, session *persistence.Session, @@ -467,15 +406,6 @@ func (s *storageYDB) createSnapshot( return nil, nil } - baseSnapshotID, baseCheckpointID, err := s.getIncremental( - ctx, - tx, - snapshot.Disk, - ) - if err != nil { - return nil, err - } - state := snapshotState{ id: snapshot.ID, folderID: snapshot.FolderID, @@ -486,8 +416,6 @@ func (s *storageYDB) createSnapshot( createTaskID: snapshot.CreateTaskID, creatingAt: snapshot.CreatingAt, createdBy: snapshot.CreatedBy, - baseSnapshotID: baseSnapshotID, - baseCheckpointID: baseCheckpointID, useDataplaneTasks: snapshot.UseDataplaneTasks, status: snapshotStatusCreating, @@ -615,54 +543,6 @@ func (s *storageYDB) snapshotCreated( return err } - if len(state.baseSnapshotID) == 0 { - _, err := tx.Execute(ctx, fmt.Sprintf(` - --!syntax_v1 - pragma TablePathPrefix = "%v"; - declare $zone_id as Utf8; - declare $disk_id as Utf8; - declare $snapshot_id as Utf8; - declare $checkpoint_id as Utf8; - - upsert into incremental (zone_id, disk_id, snapshot_id, checkpoint_id) - values ($zone_id, $disk_id, $snapshot_id, $checkpoint_id) - `, s.snapshotsPath), - persistence.ValueParam("$zone_id", persistence.UTF8Value(state.zoneID)), - persistence.ValueParam("$disk_id", persistence.UTF8Value(state.diskID)), - persistence.ValueParam("$snapshot_id", persistence.UTF8Value(snapshotID)), - persistence.ValueParam("$checkpoint_id", persistence.UTF8Value(state.checkpointID)), - ) - if err != nil { - return err - } - } else { - // Remove previous incremental snapshot and insert new one instead. - _, err := tx.Execute(ctx, fmt.Sprintf(` - --!syntax_v1 - pragma TablePathPrefix = "%v"; - declare $zone_id as Utf8; - declare $disk_id as Utf8; - declare $snapshot_id as Utf8; - declare $checkpoint_id as Utf8; - declare $base_snapshot_id as Utf8; - - delete from incremental - where zone_id = $zone_id and disk_id = $disk_id and snapshot_id = $base_snapshot_id; - - upsert into incremental (zone_id, disk_id, snapshot_id, checkpoint_id) - values ($zone_id, $disk_id, $snapshot_id, $checkpoint_id) - `, s.snapshotsPath), - persistence.ValueParam("$zone_id", persistence.UTF8Value(state.zoneID)), - persistence.ValueParam("$disk_id", persistence.UTF8Value(state.diskID)), - persistence.ValueParam("$snapshot_id", persistence.UTF8Value(snapshotID)), - persistence.ValueParam("$checkpoint_id", persistence.UTF8Value(state.checkpointID)), - persistence.ValueParam("$base_snapshot_id", persistence.UTF8Value(state.baseSnapshotID)), - ) - if err != nil { - return err - } - } - return tx.Commit(ctx) } @@ -774,24 +654,6 @@ func (s *storageYDB) deleteSnapshot( return nil, err } - _, err = tx.Execute(ctx, fmt.Sprintf(` - --!syntax_v1 - pragma TablePathPrefix = "%v"; - declare $zone_id as Utf8; - declare $disk_id as Utf8; - declare $snapshot_id as Utf8; - - delete from incremental - where zone_id = $zone_id and disk_id = $disk_id and snapshot_id = $snapshot_id - `, s.snapshotsPath), - persistence.ValueParam("$zone_id", persistence.UTF8Value(state.zoneID)), - persistence.ValueParam("$disk_id", persistence.UTF8Value(state.diskID)), - persistence.ValueParam("$snapshot_id", persistence.UTF8Value(snapshotID)), - ) - if err != nil { - return nil, err - } - err = tx.Commit(ctx) if err != nil { return nil, err diff --git a/cloud/disk_manager/internal/pkg/resources/storage.go b/cloud/disk_manager/internal/pkg/resources/storage.go index 6ec028bc8c4..ab95624545e 100644 --- a/cloud/disk_manager/internal/pkg/resources/storage.go +++ b/cloud/disk_manager/internal/pkg/resources/storage.go @@ -190,14 +190,6 @@ type Storage interface { ClearDeletedSnapshots(ctx context.Context, deletedBefore time.Time, limit int) error - LockSnapshot( - ctx context.Context, - snapshotID string, - lockTaskID string, - ) (locked bool, err error) - - UnlockSnapshot(ctx context.Context, snapshotID string, lockTaskID string) error - // Lists all existing snapshot ids in specified |folderID|. // Lists all existing snapshot ids if |folderID| is not set. ListSnapshots( diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go index 9dd0108c9fa..2193d7b1801 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/create_snapshot_from_disk_task.go @@ -49,14 +49,14 @@ func (t *createSnapshotFromDiskTask) run( execCtx tasks.ExecutionContext, nbsClient nbs.Client, checkpointID string, -) (*resources.SnapshotMeta, error) { +) error { disk := t.request.SrcDisk selfTaskID := execCtx.GetTaskID() diskParams, err := nbsClient.Describe(ctx, disk.DiskId) if err != nil { - return nil, err + return err } snapshotMeta, err := t.storage.CreateSnapshot(ctx, resources.SnapshotMeta{ @@ -72,11 +72,11 @@ func (t *createSnapshotFromDiskTask) run( Encryption: diskParams.EncryptionDesc, }) if err != nil { - return nil, err + return err } if snapshotMeta == nil { - return nil, errors.NewNonCancellableErrorf( + return errors.NewNonCancellableErrorf( "id %v is not accepted", t.request.DstSnapshotId, ) @@ -84,7 +84,7 @@ func (t *createSnapshotFromDiskTask) run( if snapshotMeta.Ready { // Already created. - return snapshotMeta, nil + return nil } err = nbsClient.CreateCheckpoint( @@ -95,52 +95,12 @@ func (t *createSnapshotFromDiskTask) run( }, ) if err != nil { - return nil, err + return err } err = t.ensureCheckpointReady(ctx, nbsClient, disk.DiskId, checkpointID) if err != nil { - return nil, err - } - - baseSnapshotID := snapshotMeta.BaseSnapshotID - baseCheckpointID := snapshotMeta.BaseCheckpointID - - if diskParams.IsDiskRegistryBasedDisk { - // Should perform full snapshot of disk. - // TODO: enable incremental snapshots for such disks. - baseSnapshotID = "" - baseCheckpointID = "" - } - - if len(baseSnapshotID) != 0 { - // Lock base snapshot to prevent deletion. - locked, err := t.storage.LockSnapshot( - ctx, - snapshotMeta.BaseSnapshotID, - selfTaskID, - ) - if err != nil { - return nil, err - } - - if locked { - logging.Info( - ctx, - "Successfully locked snapshot with id %v", - snapshotMeta.BaseSnapshotID, - ) - } else { - logging.Info( - ctx, - "Snapshot with id %v can't be locked", - snapshotMeta.BaseSnapshotID, - ) - - // Should perform full snapshot of disk. - baseSnapshotID = "" - baseCheckpointID = "" - } + return err } taskID, err := t.scheduler.ScheduleZonalTask( @@ -149,29 +109,27 @@ func (t *createSnapshotFromDiskTask) run( "", disk.ZoneId, &dataplane_protos.CreateSnapshotFromDiskRequest{ - SrcDisk: disk, - SrcDiskBaseCheckpointId: baseCheckpointID, - SrcDiskCheckpointId: checkpointID, - BaseSnapshotId: baseSnapshotID, - DstSnapshotId: t.request.DstSnapshotId, - UseS3: t.request.UseS3, - UseProxyOverlayDisk: t.request.UseProxyOverlayDisk, + SrcDisk: disk, + SrcDiskCheckpointId: checkpointID, + DstSnapshotId: t.request.DstSnapshotId, + UseS3: t.request.UseS3, + UseProxyOverlayDisk: t.request.UseProxyOverlayDisk, }, ) if err != nil { - return nil, err + return err } t.state.DataplaneTaskID = taskID response, err := t.scheduler.WaitTask(ctx, execCtx, taskID) if err != nil { - return nil, err + return err } typedResponse, ok := response.(*dataplane_protos.CreateSnapshotFromDiskResponse) if !ok { - return nil, errors.NewNonRetriableErrorf( + return errors.NewNonRetriableErrorf( "invalid create snapshot response type %T", response, ) @@ -188,7 +146,7 @@ func (t *createSnapshotFromDiskTask) run( err = execCtx.SaveState(ctx) if err != nil { - return nil, err + return err } err = t.storage.SnapshotCreated( @@ -199,10 +157,10 @@ func (t *createSnapshotFromDiskTask) run( uint64(t.state.SnapshotStorageSize), ) if err != nil { - return nil, err + return err } - return snapshotMeta, nil + return nil } func (t *createSnapshotFromDiskTask) Run( @@ -213,49 +171,23 @@ func (t *createSnapshotFromDiskTask) Run( disk := t.request.SrcDisk // NOTE: we use snapshot id as checkpoint id. checkpointID := t.request.DstSnapshotId - selfTaskID := execCtx.GetTaskID() nbsClient, err := t.nbsFactory.GetClient(ctx, disk.ZoneId) if err != nil { return err } - snapshotMeta, err := t.run(ctx, execCtx, nbsClient, checkpointID) + err = t.run(ctx, execCtx, nbsClient, checkpointID) if err != nil { return err } - if len(snapshotMeta.BaseSnapshotID) != 0 { - err := t.storage.UnlockSnapshot( - ctx, - snapshotMeta.BaseSnapshotID, - selfTaskID, - ) - if err != nil { - return err - } - - logging.Info( - ctx, - "Successfully unlocked snapshot with id %v", - snapshotMeta.BaseSnapshotID, - ) - } - err = nbsClient.DeleteCheckpointData(ctx, disk.DiskId, checkpointID) if err != nil { return err } - if len(snapshotMeta.BaseCheckpointID) == 0 { - return nil - } - - return nbsClient.DeleteCheckpoint( - ctx, - disk.DiskId, - snapshotMeta.BaseCheckpointID, - ) + return nil } func (t *createSnapshotFromDiskTask) Cancel( @@ -298,24 +230,6 @@ func (t *createSnapshotFromDiskTask) Cancel( ) } - // NBS-3192. - if len(snapshotMeta.BaseSnapshotID) != 0 { - err := t.storage.UnlockSnapshot( - ctx, - snapshotMeta.BaseSnapshotID, - selfTaskID, - ) - if err != nil { - return err - } - - logging.Info( - ctx, - "Successfully unlocked snapshot with id %v", - snapshotMeta.BaseSnapshotID, - ) - } - // Hack for NBS-2225. if snapshotMeta.DeleteTaskID != selfTaskID { return t.scheduler.WaitTaskEnded(ctx, snapshotMeta.DeleteTaskID) diff --git a/cloud/disk_manager/internal/pkg/services/snapshots/delete_snapshot_task.go b/cloud/disk_manager/internal/pkg/services/snapshots/delete_snapshot_task.go index 004dc217e16..1dde741ec5f 100644 --- a/cloud/disk_manager/internal/pkg/services/snapshots/delete_snapshot_task.go +++ b/cloud/disk_manager/internal/pkg/services/snapshots/delete_snapshot_task.go @@ -13,7 +13,6 @@ import ( "github.com/ydb-platform/nbs/cloud/tasks" "github.com/ydb-platform/nbs/cloud/tasks/errors" "github.com/ydb-platform/nbs/cloud/tasks/headers" - "github.com/ydb-platform/nbs/cloud/tasks/logging" ) //////////////////////////////////////////////////////////////////////////////// @@ -66,22 +65,22 @@ func (t *deleteSnapshotTask) deleteSnapshot( } // NBS-3535. - if snapshotMeta.UseDataplaneTasks && len(snapshotMeta.BaseSnapshotID) != 0 { - err := t.storage.UnlockSnapshot( - ctx, - snapshotMeta.BaseSnapshotID, - snapshotMeta.CreateTaskID, - ) - if err != nil { - return err - } - - logging.Info( - ctx, - "Successfully unlocked snapshot with id %v", - snapshotMeta.BaseSnapshotID, - ) - } + // if snapshotMeta.UseDataplaneTasks && len(snapshotMeta.BaseSnapshotID) != 0 { + // err := t.storage.UnlockSnapshot( + // ctx, + // snapshotMeta.BaseSnapshotID, + // snapshotMeta.CreateTaskID, + // ) + // if err != nil { + // return err + // } + + // logging.Info( + // ctx, + // "Successfully unlocked snapshot with id %v", + // snapshotMeta.BaseSnapshotID, + // ) + // } if len(snapshotMeta.CheckpointID) != 0 { nbsClient, err := t.nbsFactory.GetClient(ctx, snapshotMeta.Disk.ZoneId)